You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:51 UTC
[11/12] samza git commit: Consolidating package names for System,
Stream, Application and Table descriptors.
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
deleted file mode 100644
index de40932..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-
-/**
- * Interface for simple {@code SystemDescriptors} that return {@code InputDescriptors} parameterized by the type of
- * the provided stream level serde.
- */
-public interface SimpleInputDescriptorProvider {
-
- /**
- * Gets an {@link InputDescriptor} for an input stream on this system. The stream has the provided
- * stream level serde.
- * <p>
- * The type of messages in the stream is the type of the provided stream level serde.
- *
- * @param streamId id of the input stream
- * @param serde stream level serde for the input stream
- * @param <StreamMessageType> type of messages in this stream
- * @return an {@link InputDescriptor} for the input stream
- */
- <StreamMessageType> InputDescriptor<StreamMessageType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde<StreamMessageType> serde);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
deleted file mode 100644
index 4bb121d..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import com.google.common.base.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.system.SystemStreamMetadata.OffsetType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The base descriptor for a system. Allows setting properties that are common to all systems.
- * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- * <p>
- * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
- * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
- * to another message that is delivered to the {@code MessageStream}. It is applied at runtime in
- * {@code InputOperatorImpl}.
- * <p>
- * Systems may provide a {@link StreamExpander} to be used for input streams on the system. A {@link StreamExpander}
- * expands the provided {@code InputDescriptor} to a sub-DAG of one or more operators on the {@code StreamGraph},
- * and returns a new {@code MessageStream} with the combined results. It is called during graph description
- * in {@code StreamGraph#getInputStream}.
- * <p>
- * Systems that support consuming messages from a stream should provide users means of obtaining an
- * {@code InputDescriptor}. Recommended interfaces for doing so are {@link TransformingInputDescriptorProvider} for
- * systems that support system level {@link InputTransformer}, {@link ExpandingInputDescriptorProvider} for systems
- * that support system level {@link StreamExpander} functions, and {@link SimpleInputDescriptorProvider} otherwise.
- * <p>
- * Systems that support producing messages to a stream should provide users means of obtaining an
- * {@code OutputDescriptor}. Recommended interface for doing so is {@link OutputDescriptorProvider}.
- * <p>
- * It is not required for SystemDescriptors to implement one of the Provider interfaces above. System implementers
- * may choose to expose additional or alternate APIs for obtaining Input/Output Descriptors by extending
- * SystemDescriptor directly.
- *
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class SystemDescriptor<SubClass extends SystemDescriptor<SubClass>> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SystemDescriptor.class);
- private static final String FACTORY_CONFIG_KEY = "systems.%s.samza.factory";
- private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default";
- private static final String DEFAULT_STREAM_CONFIGS_CONFIG_KEY = "systems.%s.default.stream.%s";
- private static final String SYSTEM_CONFIGS_CONFIG_KEY = "systems.%s.%s";
- private static final Pattern SYSTEM_NAME_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
- private final String systemName;
- private final Optional<String> factoryClassNameOptional;
- private final Optional<InputTransformer> transformerOptional;
- private final Optional<StreamExpander> expanderOptional;
-
- private final Map<String, String> systemConfigs = new HashMap<>();
- private final Map<String, String> defaultStreamConfigs = new HashMap<>();
- private Optional<OffsetType> defaultStreamOffsetDefaultOptional = Optional.empty();
-
- /**
- * Constructs a {@link SystemDescriptor} instance.
- *
- * @param systemName name of this system
- * @param factoryClassName name of the SystemFactory class for this system
- * @param transformer the {@link InputTransformer} for the system if any, else null
- * @param expander the {@link StreamExpander} for the system if any, else null
- */
- public SystemDescriptor(String systemName, String factoryClassName, InputTransformer transformer, StreamExpander expander) {
- Preconditions.checkArgument(isValidSystemName(systemName),
- String.format("systemName: %s must be non-empty and must not contain spaces or special characters.", systemName));
- if (StringUtils.isBlank(factoryClassName)) {
- LOGGER.warn("Blank SystemFactory class name for system: {}. A value must be provided in configuration using {}.",
- systemName, String.format(FACTORY_CONFIG_KEY, systemName));
- }
- this.systemName = systemName;
- this.factoryClassNameOptional = Optional.ofNullable(StringUtils.stripToNull(factoryClassName));
- this.transformerOptional = Optional.ofNullable(transformer);
- this.expanderOptional = Optional.ofNullable(expander);
- }
-
- /**
- * If a container starts up without a checkpoint, this property determines where in the input stream we should start
- * consuming. The value must be an {@link OffsetType}, one of the following:
- * <ul>
- * <li>upcoming: Start processing messages that are published after the job starts.
- * Any messages published while the job was not running are not processed.
- * <li>oldest: Start processing at the oldest available message in the system,
- * and reprocess the entire available message history.
- * </ul>
- * This property is for all streams obtained using this system descriptor. To set it for an individual stream,
- * see {@link org.apache.samza.operators.descriptors.base.stream.InputDescriptor#withOffsetDefault}.
- * If both are defined, the stream-level definition takes precedence.
- *
- * @param offsetType offset type to start processing from
- * @return this system descriptor
- */
- public SubClass withDefaultStreamOffsetDefault(OffsetType offsetType) {
- this.defaultStreamOffsetDefaultOptional = Optional.ofNullable(offsetType);
- return (SubClass) this;
- }
-
- /**
- * Additional system-specific properties for this system.
- * <p>
- * These properties are added under the {@code systems.system-name.*} scope.
- *
- * @param systemConfigs system-specific properties for this system
- * @return this system descriptor
- */
- public SubClass withSystemConfigs(Map<String, String> systemConfigs) {
- this.systemConfigs.putAll(systemConfigs);
- return (SubClass) this;
- }
-
- /**
- * Default properties for any stream obtained using this system descriptor.
- * <p>
- * For example, if "systems.kafka-system.default.stream.replication.factor"=2 was configured,
- * then every Kafka stream created on the kafka-system will have a replication factor of 2
- * unless the property is explicitly overridden using the stream descriptor.
- *
- * @param defaultStreamConfigs default stream properties
- * @return this system descriptor
- */
- public SubClass withDefaultStreamConfigs(Map<String, String> defaultStreamConfigs) {
- this.defaultStreamConfigs.putAll(defaultStreamConfigs);
- return (SubClass) this;
- }
-
- public String getSystemName() {
- return this.systemName;
- }
-
- public Optional<InputTransformer> getTransformer() {
- return this.transformerOptional;
- }
-
- public Optional<StreamExpander> getExpander() {
- return this.expanderOptional;
- }
-
- private boolean isValidSystemName(String id) {
- return StringUtils.isNotBlank(id) && SYSTEM_NAME_PATTERN.matcher(id).matches();
- }
-
- public Map<String, String> toConfig() {
- HashMap<String, String> configs = new HashMap<>();
- this.factoryClassNameOptional.ifPresent(name -> configs.put(String.format(FACTORY_CONFIG_KEY, systemName), name));
- this.defaultStreamOffsetDefaultOptional.ifPresent(dsod ->
- configs.put(String.format(DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, systemName), dsod.name().toLowerCase()));
- this.defaultStreamConfigs.forEach((key, value) ->
- configs.put(String.format(DEFAULT_STREAM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
- this.systemConfigs.forEach((key, value) ->
- configs.put(String.format(SYSTEM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
- return configs;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
deleted file mode 100644
index 5b43fbd..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
- * their own {@code InputTransformer} function result types.
- *
- * @param <InputTransformerType> type of the system level {@code InputTransformer} results
- */
-public interface TransformingInputDescriptorProvider<InputTransformerType> {
-
- /**
- * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
- * stream level serde, and the default system level {@code InputTransformer}.
- * <p>
- * The type of messages in the stream is the type of messages returned by the default system level
- * {@code InputTransformer}
- *
- * @param streamId id of the input stream
- * @param serde stream level serde for the input stream
- * @return an {@link InputDescriptor} for the input stream
- */
- InputDescriptor<InputTransformerType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
index 12823cc..d3c1a23 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
@@ -24,12 +24,12 @@ import org.apache.samza.annotation.InterfaceStability;
/**
* A function that can be closed after its execution.
- *
- * <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
- *
- * <p> Order of finalization: {@link ClosableFunction}s are closed in the reverse topological order of operators in the
- * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any two operators A and B in the graph, if operator B
- * consumes results from operator A, then operator B is guaranteed to be closed before operator A.
+ * <p>
+ * Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
+ * <p>
+ * Order of finalization: {@link ClosableFunction}s are closed in the reverse topological order of operators in the
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor}. For any two operators A and B in the
+ * graph, if operator B consumes results from operator A, then operator B is guaranteed to be closed before operator A.
*
*/
@InterfaceStability.Evolving
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index 7f950de..c808876 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -24,10 +24,11 @@ import org.apache.samza.context.Context;
/**
* A function that can be initialized before execution.
- *
- * <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
- * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any two operators A and B in the graph, if operator B
- * consumes results from operator A, then operator A is guaranteed to be initialized before operator B.
+ * <p>
+ * Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor}. For any two operators A and B in the
+ * graph, if operator B consumes results from operator A, then operator A is guaranteed to be initialized before
+ * operator B.
*
*/
@InterfaceStability.Evolving
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
deleted file mode 100644
index 704d187..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import java.io.Serializable;
-import org.apache.samza.system.IncomingMessageEnvelope;
-
-/**
- * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM}
- * which is delivered to the {@code MessageStream}. Called in {@code InputOperatorImpl} when incoming messages
- * from a {@code SystemConsumer} are being delivered to the application.
- * <p>
- * This is provided by default by transforming system descriptor implementations and can not be overridden
- * or set on a per stream level.
- *
- * @param <OM> type of the transformed message
- */
-public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable {
-
- /**
- * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message
- * which is delivered to the {@code MessageStream}.
- *
- * @param ime the {@link IncomingMessageEnvelope} to be transformed
- * @return the transformed message
- */
- OM apply(IncomingMessageEnvelope ime);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
deleted file mode 100644
index 7bbf601..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import java.io.Serializable;
-import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-
-/**
- * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
- * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamApplicationDescriptor#getInputStream}
- * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an expanding system descriptor.
- * <p>
- * This is provided by default by expanding system descriptor implementations and can not be overridden
- * or set on a per stream level.
- *
- * @param <OM> type of the messages in the resultant {@link MessageStream}
- */
-public interface StreamExpander<OM> extends Serializable {
-
- /**
- * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
- * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
- * is being used to get an {@link MessageStream} using {@link StreamApplicationDescriptor#getInputStream}.
- * <p>
- * Notes for system implementers:
- * <p>
- * Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
- * {@link StreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor
- * (like this one) again.
- * <p>
- * It's the {@link StreamExpander}'s responsibility to propagate any properties, including serde, from the
- * user-provided {@link InputDescriptor} to the expanded input descriptors.
- *
- * @param streamAppDesc the {@link StreamApplicationDescriptor} to register the expanded sub-DAG of operators on
- * @param inputDescriptor the {@link InputDescriptor} to be expanded
- * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
- */
- MessageStream<OM> apply(StreamApplicationDescriptor streamAppDesc, InputDescriptor inputDescriptor);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java
new file mode 100644
index 0000000..d180e1e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
+ * their own {@code StreamExpander} function result types.
+ *
+ * @param <StreamExpanderType> type of the system level {@code StreamExpander} results
+ */
+public interface ExpandingInputDescriptorProvider<StreamExpanderType> {
+
+ /**
+ * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
+ * stream level serde, and the default system level {@code StreamExpander}
+ * <p>
+ * The type of messages in the stream is the type of messages returned by the default system level
+ * {@code StreamExpander}
+ *
+ * @param streamId id of the input stream
+ * @param serde stream level serde to be propagated to expanded input streams
+ * @return an {@link InputDescriptor} for the input stream
+ */
+ InputDescriptor<StreamExpanderType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
new file mode 100644
index 0000000..6d18afc
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic input stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured
+ * {@link GenericSystemDescriptor}.
+ * <p>
+ * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
+ * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
+ * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public final class GenericInputDescriptor<StreamMessageType>
+ extends InputDescriptor<StreamMessageType, GenericInputDescriptor<StreamMessageType>> {
+ GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
+ super(streamId, serde, systemDescriptor, null);
+ }
+
+ @Override
+ public GenericInputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
+ return super.withPhysicalName(physicalName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
new file mode 100644
index 0000000..5fe3a98
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured
+ * {@link GenericSystemDescriptor}.
+ * <p>
+ * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
+ * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
+ * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public final class GenericOutputDescriptor<StreamMessageType>
+ extends OutputDescriptor<StreamMessageType, GenericOutputDescriptor<StreamMessageType>> {
+ GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
+ super(streamId, serde, systemDescriptor);
+ }
+
+ @Override
+ public GenericOutputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
+ return super.withPhysicalName(physicalName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
new file mode 100644
index 0000000..59b2a12
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic system.
+ * <p>
+ * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
+ * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
+ * Additional system specific properties may be provided using {@link #withSystemConfigs}
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ */
+public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
+ implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+
+ /**
+ * Constructs a {@link GenericSystemDescriptor} instance with no system level serde.
+ * Serdes must be provided explicitly at stream level when getting input or output descriptors.
+ *
+ * @param systemName name of this system
+ * @param factoryClassName name of the SystemFactory class for this system
+ */
+ public GenericSystemDescriptor(String systemName, String factoryClassName) {
+ super(systemName, factoryClassName, null, null);
+ }
+
+ @Override
+ public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
+ String streamId, Serde<StreamMessageType> serde) {
+ return new GenericInputDescriptor<>(streamId, this, serde);
+ }
+
+ @Override
+ public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
+ String streamId, Serde<StreamMessageType> serde) {
+ return new GenericOutputDescriptor<>(streamId, this, serde);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
new file mode 100644
index 0000000..2c0ca88
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata.OffsetType;
+
+/**
+ * The base descriptor for an input stream. Allows setting properties that are common to all input streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class InputDescriptor<StreamMessageType, SubClass extends InputDescriptor<StreamMessageType, SubClass>>
+ extends StreamDescriptor<StreamMessageType, SubClass> {
+ private static final String RESET_OFFSET_CONFIG_KEY = "streams.%s.samza.reset.offset";
+ private static final String OFFSET_DEFAULT_CONFIG_KEY = "streams.%s.samza.offset.default";
+ private static final String PRIORITY_CONFIG_KEY = "streams.%s.samza.priority";
+ private static final String BOOTSTRAP_CONFIG_KEY = "streams.%s.samza.bootstrap";
+ private static final String BOUNDED_CONFIG_KEY = "streams.%s.samza.bounded";
+ private static final String DELETE_COMMITTED_MESSAGES_CONFIG_KEY = "streams.%s.samza.delete.committed.messages";
+
+ private final Optional<InputTransformer> transformerOptional;
+
+ private Optional<Boolean> resetOffsetOptional = Optional.empty();
+ private Optional<OffsetType> offsetDefaultOptional = Optional.empty();
+ private Optional<Integer> priorityOptional = Optional.empty();
+ private Optional<Boolean> isBootstrapOptional = Optional.empty();
+ private Optional<Boolean> isBoundedOptional = Optional.empty();
+ private Optional<Boolean> deleteCommittedMessagesOptional = Optional.empty();
+
+ /**
+ * Constructs an {@link InputDescriptor} instance.
+ *
+ * @param streamId id of the stream
+ * @param serde serde for messages in the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ * @param transformer stream level input stream transform function if available, else null
+ */
+ public InputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor, InputTransformer transformer) {
+ super(streamId, serde, systemDescriptor);
+
+ // stream level transformer takes precedence over system level transformer
+ if (transformer != null) {
+ this.transformerOptional = Optional.of(transformer);
+ } else {
+ this.transformerOptional = systemDescriptor.getTransformer();
+ }
+ }
+
+ /**
+ * If set, when a Samza container starts up, it ignores any checkpointed offset for this particular
+ * input stream. Its behavior is thus determined by the {@link #withOffsetDefault} setting.
+ * Note that the reset takes effect every time a container is started, which may be every time you restart your job,
+ * or more frequently if a container fails and is restarted by the framework.
+ *
+ * @return this input descriptor
+ */
+ public SubClass shouldResetOffset() {
+ this.resetOffsetOptional = Optional.of(true);
+ return (SubClass) this;
+ }
+
+ /**
+ * If a container starts up without a checkpoint, this property determines where in the input stream we should start
+ * consuming. The value must be an OffsetType, one of the following:
+ * <ul>
+ * <li>upcoming: Start processing messages that are published after the job starts.
+ * Any messages published while the job was not running are not processed.
+ * <li>oldest: Start processing at the oldest available message in the system,
+ * and reprocess the entire available message history.
+ * </ul>
+ * This property is for an individual stream. To set it for all streams within a system, see
+ * {@link SystemDescriptor#withDefaultStreamOffsetDefault}. If both are defined, the stream-level definition
+ * takes precedence.
+ *
+ * @param offsetDefault offset type to start processing from
+ * @return this input descriptor
+ */
+ public SubClass withOffsetDefault(OffsetType offsetDefault) {
+ this.offsetDefaultOptional = Optional.ofNullable(offsetDefault);
+ return (SubClass) this;
+ }
+
+ /**
+ * If one or more streams have a priority set (any positive integer), they will be processed with higher priority
+ * than the other streams.
+ * <p>
+ * You can set several streams to the same priority, or define multiple priority levels by assigning a
+ * higher number to the higher-priority streams.
+ * <p>
+ * If a higher-priority stream has any messages available, they will always be processed first;
+ * messages from lower-priority streams are only processed when there are no new messages on higher-priority inputs.
+ *
+ * @param priority priority for this input stream
+ * @return this input descriptor
+ */
+ public SubClass withPriority(int priority) {
+ this.priorityOptional = Optional.of(priority);
+ return (SubClass) this;
+ }
+
+ /**
+ * If set, this stream will be processed as a bootstrap stream. This means that every time a Samza container
+ * starts up, this stream will be fully consumed before messages from any other stream are processed.
+ *
+ * @return this input descriptor
+ */
+ public SubClass shouldBootstrap() {
+ this.isBootstrapOptional = Optional.of(true);
+ return (SubClass) this;
+ }
+
+ /**
+ * If set, this stream will be considered a bounded stream. If all input streams in an application are
+ * bounded, the job is considered to be running in batch processing mode.
+ *
+ * @return this input descriptor
+ */
+ public SubClass isBounded() {
+ this.isBoundedOptional = Optional.of(true);
+ return (SubClass) this;
+ }
+
+ /**
+ * If set, and supported by the system implementation, messages older than the latest checkpointed offset
+ * for this stream may be deleted after the commit.
+ *
+ * @return this input descriptor
+ */
+ public SubClass shouldDeleteCommittedMessages() {
+ this.deleteCommittedMessagesOptional = Optional.of(true);
+ return (SubClass) this;
+ }
+
+ public Optional<InputTransformer> getTransformer() {
+ return this.transformerOptional;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ HashMap<String, String> configs = new HashMap<>(super.toConfig());
+ String streamId = getStreamId();
+ this.offsetDefaultOptional.ifPresent(od ->
+ configs.put(String.format(OFFSET_DEFAULT_CONFIG_KEY, streamId), od.name().toLowerCase()));
+ this.resetOffsetOptional.ifPresent(resetOffset ->
+ configs.put(String.format(RESET_OFFSET_CONFIG_KEY, streamId), Boolean.toString(resetOffset)));
+ this.priorityOptional.ifPresent(priority ->
+ configs.put(String.format(PRIORITY_CONFIG_KEY, streamId), Integer.toString(priority)));
+ this.isBootstrapOptional.ifPresent(bootstrap ->
+ configs.put(String.format(BOOTSTRAP_CONFIG_KEY, streamId), Boolean.toString(bootstrap)));
+ this.isBoundedOptional.ifPresent(bounded ->
+ configs.put(String.format(BOUNDED_CONFIG_KEY, streamId), Boolean.toString(bounded)));
+ this.deleteCommittedMessagesOptional.ifPresent(deleteCommittedMessages ->
+ configs.put(String.format(DELETE_COMMITTED_MESSAGES_CONFIG_KEY, streamId),
+ Boolean.toString(deleteCommittedMessages)));
+ return Collections.unmodifiableMap(configs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java
new file mode 100644
index 0000000..4435f06
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import java.io.Serializable;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM}
+ * which is delivered to the {@code MessageStream}. Called in {@code InputOperatorImpl} when incoming messages
+ * from a {@code SystemConsumer} are being delivered to the application.
+ * <p>
+ * This is provided by default by transforming system descriptor implementations and can not be overridden
+ * or set on a per stream level.
+ *
+ * @param <OM> type of the transformed message
+ */
+public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable {
+
+ /**
+ * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message
+ * which is delivered to the {@code MessageStream}.
+ *
+ * @param ime the {@link IncomingMessageEnvelope} to be transformed
+ * @return the transformed message
+ */
+ OM apply(IncomingMessageEnvelope ime);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
new file mode 100644
index 0000000..70a5d0f
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * The base descriptor for an output stream. Allows setting properties that are common to all output streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class OutputDescriptor<StreamMessageType, SubClass extends OutputDescriptor<StreamMessageType, SubClass>>
+ extends StreamDescriptor<StreamMessageType, SubClass> {
+ /**
+ * Constructs an {@link OutputDescriptor} instance.
+ *
+ * @param streamId id of the stream
+ * @param serde serde for messages in the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ public OutputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) {
+ super(streamId, serde, systemDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java
new file mode 100644
index 0000000..2ebfe79
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * Interface for simple {@code SystemDescriptors} that return {@code OutputDescriptors} parameterized by the type of
+ * the provided stream level serde.
+ */
+public interface OutputDescriptorProvider {
+
+ /**
+ * Gets an {@link OutputDescriptor} representing an output stream on this system that uses the provided
+ * stream specific serde instead of the default system serde.
+ * <p>
+ * An {@code OutputStream<KV<K, V>>}, obtained using a descriptor with a {@code KVSerde<K, V>}, can send messages
+ * of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other {@code Serde<M>} can send messages of
+ * type M without a key.
+ * <p>
+ * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used if the {@code SystemProducer}
+ * serializes the outgoing messages itself, and no prior serialization is required from the framework.
+ *
+ * @param streamId id of the output stream
+ * @param serde serde for this output stream that overrides the default system serde, if any.
+ * @param <StreamMessageType> type of messages in the output stream
+ * @return the {@link OutputDescriptor} for the output stream
+ */
+ <StreamMessageType> OutputDescriptor<StreamMessageType, ? extends OutputDescriptor> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java
new file mode 100644
index 0000000..ed1c6a4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * Interface for simple {@code SystemDescriptors} that return {@code InputDescriptors} parameterized by the type of
+ * the provided stream level serde.
+ */
+public interface SimpleInputDescriptorProvider {
+
+ /**
+ * Gets an {@link InputDescriptor} for an input stream on this system. The stream has the provided
+ * stream level serde.
+ * <p>
+ * The type of messages in the stream is the type of the provided stream level serde.
+ *
+ * @param streamId id of the input stream
+ * @param serde stream level serde for the input stream
+ * @param <StreamMessageType> type of messages in this stream
+ * @return an {@link InputDescriptor} for the input stream
+ */
+ <StreamMessageType> InputDescriptor<StreamMessageType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde<StreamMessageType> serde);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
new file mode 100644
index 0000000..d2b25f9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class StreamDescriptor<StreamMessageType, SubClass extends StreamDescriptor<StreamMessageType, SubClass>> {
+ private static final String SYSTEM_CONFIG_KEY = "streams.%s.samza.system";
+ private static final String PHYSICAL_NAME_CONFIG_KEY = "streams.%s.samza.physical.name";
+ private static final String STREAM_CONFIGS_CONFIG_KEY = "streams.%s.%s";
+ private static final Pattern STREAM_ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+ private final String streamId;
+ private final Serde serde;
+ private final SystemDescriptor systemDescriptor;
+
+ private final Map<String, String> streamConfigs = new HashMap<>();
+ private Optional<String> physicalNameOptional = Optional.empty();
+
+ /**
+ * Constructs a {@link StreamDescriptor} instance.
+ *
+ * @param streamId id of the stream
+ * @param serde serde for messages in the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ StreamDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) {
+ Preconditions.checkArgument(systemDescriptor != null,
+ String.format("SystemDescriptor must not be null. streamId: %s", streamId));
+ String systemName = systemDescriptor.getSystemName();
+ Preconditions.checkState(isValidStreamId(streamId),
+ String.format("streamId must be non-empty and must not contain spaces or special characters. " +
+ "streamId: %s, systemName: %s", streamId, systemName));
+ Preconditions.checkArgument(serde != null,
+ String.format("Serde must not be null. streamId: %s systemName: %s", streamId, systemName));
+ this.streamId = streamId;
+ this.serde = serde;
+ this.systemDescriptor = systemDescriptor;
+ }
+
+ /**
+ * The physical name of the stream on the system on which this stream will be accessed.
+ * This is opposed to the {@code streamId} which is the logical name that Samza uses to identify the stream.
+ * <p>
+ * A physical name could be a Kafka topic name, an HDFS file URN, or any other system-specific identifier.
+ * <p>
+ * If not provided, the logical {@code streamId} is used as the physical name.
+ *
+ * @param physicalName physical name for this stream.
+ * @return this stream descriptor.
+ */
+ protected SubClass withPhysicalName(String physicalName) {
+ this.physicalNameOptional = Optional.ofNullable(physicalName);
+ return (SubClass) this;
+ }
+
+ /**
+ * Additional system-specific properties for this stream.
+ * <p>
+ * These properties are added under the {@code streams.stream-id.*} scope.
+ *
+ * @param streamConfigs system-specific properties for this stream
+ * @return this stream descriptor
+ */
+ public SubClass withStreamConfigs(Map<String, String> streamConfigs) {
+ this.streamConfigs.putAll(streamConfigs);
+ return (SubClass) this;
+ }
+
+ public String getStreamId() {
+ return this.streamId;
+ }
+
+ public String getSystemName() {
+ return this.systemDescriptor.getSystemName();
+ }
+
+ public Serde getSerde() {
+ return this.serde;
+ }
+
+ public SystemDescriptor getSystemDescriptor() {
+ return this.systemDescriptor;
+ }
+
+ public Optional<String> getPhysicalName() {
+ return physicalNameOptional;
+ }
+
+ private boolean isValidStreamId(String id) {
+ return StringUtils.isNotBlank(id) && STREAM_ID_PATTERN.matcher(id).matches();
+ }
+
+ public Map<String, String> toConfig() {
+ HashMap<String, String> configs = new HashMap<>();
+ configs.put(String.format(SYSTEM_CONFIG_KEY, streamId), getSystemName());
+ this.physicalNameOptional.ifPresent(physicalName ->
+ configs.put(String.format(PHYSICAL_NAME_CONFIG_KEY, streamId), physicalName));
+ this.streamConfigs.forEach((key, value) ->
+ configs.put(String.format(STREAM_CONFIGS_CONFIG_KEY, streamId, key), value));
+ return Collections.unmodifiableMap(configs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java
new file mode 100644
index 0000000..4b8e6f0
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import java.io.Serializable;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.MessageStream;
+
+/**
+ * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
+ * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamApplicationDescriptor#getInputStream}
+ * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an expanding system descriptor.
+ * <p>
+ * This is provided by default by expanding system descriptor implementations and can not be overridden
+ * or set on a per stream level.
+ *
+ * @param <OM> type of the messages in the resultant {@link MessageStream}
+ */
+public interface StreamExpander<OM> extends Serializable {
+
+ /**
+ * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
+ * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
+ * is being used to get an {@link MessageStream} using {@link StreamApplicationDescriptor#getInputStream}.
+ * <p>
+ * Notes for system implementers:
+ * <p>
+ * Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
+ * {@link StreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor
+ * (like this one) again.
+ * <p>
+ * It's the {@link StreamExpander}'s responsibility to propagate any properties, including serde, from the
+ * user-provided {@link InputDescriptor} to the expanded input descriptors.
+ *
+ * @param streamAppDesc the {@link StreamApplicationDescriptor} to register the expanded sub-DAG of operators on
+ * @param inputDescriptor the {@link InputDescriptor} to be expanded
+ * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
+ */
+ MessageStream<OM> apply(StreamApplicationDescriptor streamAppDesc, InputDescriptor inputDescriptor);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
new file mode 100644
index 0000000..4b93a32
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.system.SystemStreamMetadata.OffsetType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base descriptor for a system. Allows setting properties that are common to all systems.
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * <p>
+ * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
+ * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
+ * to another message that is delivered to the {@code MessageStream}. It is applied at runtime in
+ * {@code InputOperatorImpl}.
+ * <p>
+ * Systems may provide a {@link StreamExpander} to be used for input streams on the system. A {@link StreamExpander}
+ * expands the provided {@code InputDescriptor} to a sub-DAG of one or more operators on the {@code StreamGraph},
+ * and returns a new {@code MessageStream} with the combined results. It is called during graph description
+ * in {@code StreamGraph#getInputStream}.
+ * <p>
+ * Systems that support consuming messages from a stream should provide users means of obtaining an
+ * {@code InputDescriptor}. Recommended interfaces for doing so are {@link TransformingInputDescriptorProvider} for
+ * systems that support system level {@link InputTransformer}, {@link ExpandingInputDescriptorProvider} for systems
+ * that support system level {@link StreamExpander} functions, and {@link SimpleInputDescriptorProvider} otherwise.
+ * <p>
+ * Systems that support producing messages to a stream should provide users means of obtaining an
+ * {@code OutputDescriptor}. Recommended interface for doing so is {@link OutputDescriptorProvider}.
+ * <p>
+ * It is not required for SystemDescriptors to implement one of the Provider interfaces above. System implementers
+ * may choose to expose additional or alternate APIs for obtaining Input/Output Descriptors by extending
+ * SystemDescriptor directly.
+ *
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class SystemDescriptor<SubClass extends SystemDescriptor<SubClass>> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SystemDescriptor.class);
+ private static final String FACTORY_CONFIG_KEY = "systems.%s.samza.factory";
+ private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default";
+ private static final String DEFAULT_STREAM_CONFIGS_CONFIG_KEY = "systems.%s.default.stream.%s";
+ private static final String SYSTEM_CONFIGS_CONFIG_KEY = "systems.%s.%s";
+ private static final Pattern SYSTEM_NAME_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+ private final String systemName;
+ private final Optional<String> factoryClassNameOptional;
+ private final Optional<InputTransformer> transformerOptional;
+ private final Optional<StreamExpander> expanderOptional;
+
+ private final Map<String, String> systemConfigs = new HashMap<>();
+ private final Map<String, String> defaultStreamConfigs = new HashMap<>();
+ private Optional<OffsetType> defaultStreamOffsetDefaultOptional = Optional.empty();
+
+ /**
+ * Constructs a {@link SystemDescriptor} instance.
+ *
+ * @param systemName name of this system
+ * @param factoryClassName name of the SystemFactory class for this system
+ * @param transformer the {@link InputTransformer} for the system if any, else null
+ * @param expander the {@link StreamExpander} for the system if any, else null
+ */
+ public SystemDescriptor(String systemName, String factoryClassName, InputTransformer transformer, StreamExpander expander) {
+ Preconditions.checkArgument(isValidSystemName(systemName),
+ String.format("systemName: %s must be non-empty and must not contain spaces or special characters.", systemName));
+ if (StringUtils.isBlank(factoryClassName)) {
+ LOGGER.warn("Blank SystemFactory class name for system: {}. A value must be provided in configuration using {}.",
+ systemName, String.format(FACTORY_CONFIG_KEY, systemName));
+ }
+ this.systemName = systemName;
+ this.factoryClassNameOptional = Optional.ofNullable(StringUtils.stripToNull(factoryClassName));
+ this.transformerOptional = Optional.ofNullable(transformer);
+ this.expanderOptional = Optional.ofNullable(expander);
+ }
+
+ /**
+ * If a container starts up without a checkpoint, this property determines where in the input stream we should start
+ * consuming. The value must be an {@link OffsetType}, one of the following:
+ * <ul>
+ * <li>upcoming: Start processing messages that are published after the job starts.
+ * Any messages published while the job was not running are not processed.
+ * <li>oldest: Start processing at the oldest available message in the system,
+ * and reprocess the entire available message history.
+ * </ul>
+ * This property is for all streams obtained using this system descriptor. To set it for an individual stream,
+ * see {@link InputDescriptor#withOffsetDefault}.
+ * If both are defined, the stream-level definition takes precedence.
+ *
+ * @param offsetType offset type to start processing from
+ * @return this system descriptor
+ */
+ public SubClass withDefaultStreamOffsetDefault(OffsetType offsetType) {
+ this.defaultStreamOffsetDefaultOptional = Optional.ofNullable(offsetType);
+ return (SubClass) this;
+ }
+
+ /**
+ * Additional system-specific properties for this system.
+ * <p>
+ * These properties are added under the {@code systems.system-name.*} scope.
+ *
+ * @param systemConfigs system-specific properties for this system
+ * @return this system descriptor
+ */
+ public SubClass withSystemConfigs(Map<String, String> systemConfigs) {
+ this.systemConfigs.putAll(systemConfigs);
+ return (SubClass) this;
+ }
+
+ /**
+ * Default properties for any stream obtained using this system descriptor.
+ * <p>
+ * For example, if "systems.kafka-system.default.stream.replication.factor"=2 was configured,
+ * then every Kafka stream created on the kafka-system will have a replication factor of 2
+ * unless the property is explicitly overridden using the stream descriptor.
+ *
+ * @param defaultStreamConfigs default stream properties
+ * @return this system descriptor
+ */
+ public SubClass withDefaultStreamConfigs(Map<String, String> defaultStreamConfigs) {
+ this.defaultStreamConfigs.putAll(defaultStreamConfigs);
+ return (SubClass) this;
+ }
+
+ public String getSystemName() {
+ return this.systemName;
+ }
+
+ public Optional<InputTransformer> getTransformer() {
+ return this.transformerOptional;
+ }
+
+ public Optional<StreamExpander> getExpander() {
+ return this.expanderOptional;
+ }
+
+ private boolean isValidSystemName(String id) {
+ return StringUtils.isNotBlank(id) && SYSTEM_NAME_PATTERN.matcher(id).matches();
+ }
+
+ public Map<String, String> toConfig() {
+ HashMap<String, String> configs = new HashMap<>();
+ this.factoryClassNameOptional.ifPresent(name -> configs.put(String.format(FACTORY_CONFIG_KEY, systemName), name));
+ this.defaultStreamOffsetDefaultOptional.ifPresent(dsod ->
+ configs.put(String.format(DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, systemName), dsod.name().toLowerCase()));
+ this.defaultStreamConfigs.forEach((key, value) ->
+ configs.put(String.format(DEFAULT_STREAM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
+ this.systemConfigs.forEach((key, value) ->
+ configs.put(String.format(SYSTEM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
+ return configs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java
new file mode 100644
index 0000000..e626efd
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
+ * their own {@code InputTransformer} function result types.
+ *
+ * @param <InputTransformerType> type of the system level {@code InputTransformer} results
+ */
+public interface TransformingInputDescriptorProvider<InputTransformerType> {
+
+ /**
+ * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
+ * stream level serde, and the default system level {@code InputTransformer}.
+ * <p>
+ * The type of messages in the stream is the type of messages returned by the default system level
+ * {@code InputTransformer}
+ *
+ * @param streamId id of the input stream
+ * @param serde stream level serde for the input stream
+ * @return an {@link InputDescriptor} for the input stream
+ */
+ InputDescriptor<InputTransformerType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
index 7b8754a..296edf4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
deleted file mode 100644
index 350324c..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-
-/**
- * A table provider provides the implementation for a table. It ensures a table is
- * properly constructed and also manages its lifecycle.
- */
-@InterfaceStability.Unstable
-public interface TableProvider {
- /**
- * Initialize TableProvider with container and task context
- * @param context context for the task
- */
- void init(Context context);
-
- /**
- * Get an instance of the table for read/write operations
- * @return the underlying table
- */
- Table getTable();
-
- /**
- * Generate any configuration for this table, the generated configuration
- * is used by Samza container to construct this table and any components
- * necessary. Instead of manipulating the input parameters, this method
- * should return the generated configuration.
- *
- * @param jobConfig the job config
- * @param generatedConfig config generated by earlier processing, but has
- * not yet been merged to job config
- * @return configuration for this table
- */
- Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
-
- /**
- * Shutdown the underlying table
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
deleted file mode 100644
index 1bb0196..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * Factory of a table provider object
- */
-@InterfaceStability.Unstable
-public interface TableProviderFactory {
- /**
- * Constructs an instances of the table provider based on a given table spec
- * @param tableSpec the table spec
- * @return the table provider
- */
- TableProvider getTableProvider(TableSpec tableSpec);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
index 82577c4..2883a93 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -32,8 +32,9 @@ import org.apache.samza.storage.SideInputsProcessor;
/**
* TableSpec is a blueprint for creating, validating, or simply describing a table in the runtime environment.
*
- * It is typically created indirectly by constructing an instance of {@link org.apache.samza.operators.TableDescriptor},
- * and then invoke <code>BaseTableDescriptor.getTableSpec()</code>.
+ * It is typically created indirectly by constructing an instance of
+ * {@link org.apache.samza.table.descriptors.TableDescriptor}, and then invoke
+ * <code>BaseTableDescriptor.getTableSpec()</code>.
*
* It has specific attributes for common behaviors that Samza uses.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
new file mode 100644
index 0000000..5d7b89e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * User facing class to collect metadata that fully describes a
+ * Samza table. This interface should be implemented by concrete table implementations.
+ * <p>
+ * Typical user code should look like the following, notice <code>withConfig()</code>
+ * is defined in this class and the rest in subclasses.
+ *
+ * <pre>
+ * {@code
+ * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
+ * KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
+ * .withBlockSize(1024)
+ * .withConfig("some-key", "some-value");
+ * }
+ * </pre>
+
+ * Once constructed, a table descriptor can be registered with the system. Internally,
+ * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec},
+ * which is used to track tables internally.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+@InterfaceStability.Unstable
+public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
+
+ /**
+ * Get the Id of the table
+ * @return Id of the table
+ */
+ String getTableId();
+
+ /**
+ * Add a configuration entry for the table
+ * @param key the key
+ * @param value the value
+ * @return this table descriptor instance
+ */
+ D withConfig(String key, String value);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
new file mode 100644
index 0000000..8640b2a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.Table;
+
+/**
+ * A table provider provides the implementation for a table. It ensures a table is
+ * properly constructed and also manages its lifecycle.
+ */
+@InterfaceStability.Unstable
+public interface TableProvider {
+ /**
+ * Initialize TableProvider with container and task context
+ * @param context context for the task
+ */
+ void init(Context context);
+
+ /**
+ * Get an instance of the table for read/write operations
+ * @return the underlying table
+ */
+ Table getTable();
+
+ /**
+ * Generate any configuration for this table, the generated configuration
+ * is used by Samza container to construct this table and any components
+ * necessary. Instead of manipulating the input parameters, this method
+ * should return the generated configuration.
+ *
+ * @param jobConfig the job config
+ * @param generatedConfig config generated by earlier processing, but has
+ * not yet been merged to job config
+ * @return configuration for this table
+ */
+ Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
+
+ /**
+ * Shutdown the underlying table
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
new file mode 100644
index 0000000..2f9a607
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Factory of a table provider object
+ */
+@InterfaceStability.Unstable
+public interface TableProviderFactory {
+ /**
+ * Constructs an instances of the table provider based on a given table spec
+ * @param tableSpec the table spec
+ * @return the table provider
+ */
+ TableProvider getTableProvider(TableSpec tableSpec);
+}