You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:51 UTC

[11/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
deleted file mode 100644
index de40932..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-
-/**
- * Interface for simple {@code SystemDescriptors} that return {@code InputDescriptors} parameterized by the type of
- * the provided stream level serde.
- */
-public interface SimpleInputDescriptorProvider {
-
-  /**
-   * Gets an {@link InputDescriptor} for an input stream on this system. The stream has the provided
-   * stream level serde.
-   * <p>
-   * The type of messages in the stream is the type of the provided stream level serde.
-   *
-   * @param streamId id of the input stream
-   * @param serde stream level serde for the input stream
-   * @param <StreamMessageType> type of messages in this stream
-   * @return an {@link InputDescriptor} for the input stream
-   */
-  <StreamMessageType> InputDescriptor<StreamMessageType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde<StreamMessageType> serde);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
deleted file mode 100644
index 4bb121d..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import com.google.common.base.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.system.SystemStreamMetadata.OffsetType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The base descriptor for a system. Allows setting properties that are common to all systems.
- * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- * <p>
- * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
- * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
- * to another message that is delivered to the {@code MessageStream}. It is applied at runtime in
- * {@code InputOperatorImpl}.
- * <p>
- * Systems may provide a {@link StreamExpander} to be used for input streams on the system. A {@link StreamExpander}
- * expands the provided {@code InputDescriptor} to a sub-DAG of one or more operators on the {@code StreamGraph},
- * and returns a new {@code MessageStream} with the combined results. It is called during graph description
- * in {@code StreamGraph#getInputStream}.
- * <p>
- * Systems that support consuming messages from a stream should provide users means of obtaining an
- * {@code InputDescriptor}. Recommended interfaces for doing so are {@link TransformingInputDescriptorProvider} for
- * systems that support system level {@link InputTransformer}, {@link ExpandingInputDescriptorProvider} for systems
- * that support system level {@link StreamExpander} functions, and {@link SimpleInputDescriptorProvider} otherwise.
- * <p>
- * Systems that support producing messages to a stream should provide users means of obtaining an
- * {@code OutputDescriptor}. Recommended interface for doing so is {@link OutputDescriptorProvider}.
- * <p>
- * It is not required for SystemDescriptors to implement one of the Provider interfaces above. System implementers
- * may choose to expose additional or alternate APIs for obtaining Input/Output Descriptors by extending
- * SystemDescriptor directly.
- *
- * @param <SubClass> type of the concrete sub-class
- */
-public abstract class SystemDescriptor<SubClass extends SystemDescriptor<SubClass>> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SystemDescriptor.class);
-  private static final String FACTORY_CONFIG_KEY = "systems.%s.samza.factory";
-  private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default";
-  private static final String DEFAULT_STREAM_CONFIGS_CONFIG_KEY = "systems.%s.default.stream.%s";
-  private static final String SYSTEM_CONFIGS_CONFIG_KEY = "systems.%s.%s";
-  private static final Pattern SYSTEM_NAME_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
-  private final String systemName;
-  private final Optional<String> factoryClassNameOptional;
-  private final Optional<InputTransformer> transformerOptional;
-  private final Optional<StreamExpander> expanderOptional;
-
-  private final Map<String, String> systemConfigs = new HashMap<>();
-  private final Map<String, String> defaultStreamConfigs = new HashMap<>();
-  private Optional<OffsetType> defaultStreamOffsetDefaultOptional = Optional.empty();
-
-  /**
-   * Constructs a {@link SystemDescriptor} instance.
-   *
-   * @param systemName name of this system
-   * @param factoryClassName name of the SystemFactory class for this system
-   * @param transformer the {@link InputTransformer} for the system if any, else null
-   * @param expander the {@link StreamExpander} for the system if any, else null
-   */
-  public SystemDescriptor(String systemName, String factoryClassName, InputTransformer transformer, StreamExpander expander) {
-    Preconditions.checkArgument(isValidSystemName(systemName),
-        String.format("systemName: %s must be non-empty and must not contain spaces or special characters.", systemName));
-    if (StringUtils.isBlank(factoryClassName)) {
-      LOGGER.warn("Blank SystemFactory class name for system: {}. A value must be provided in configuration using {}.",
-          systemName, String.format(FACTORY_CONFIG_KEY, systemName));
-    }
-    this.systemName = systemName;
-    this.factoryClassNameOptional = Optional.ofNullable(StringUtils.stripToNull(factoryClassName));
-    this.transformerOptional = Optional.ofNullable(transformer);
-    this.expanderOptional = Optional.ofNullable(expander);
-  }
-
-  /**
-   * If a container starts up without a checkpoint, this property determines where in the input stream we should start
-   * consuming. The value must be an {@link OffsetType}, one of the following:
-   * <ul>
-   *  <li>upcoming: Start processing messages that are published after the job starts.
-   *                Any messages published while the job was not running are not processed.
-   *  <li>oldest: Start processing at the oldest available message in the system,
-   *              and reprocess the entire available message history.
-   * </ul>
-   * This property is for all streams obtained using this system descriptor. To set it for an individual stream,
-   * see {@link org.apache.samza.operators.descriptors.base.stream.InputDescriptor#withOffsetDefault}.
-   * If both are defined, the stream-level definition takes precedence.
-   *
-   * @param offsetType offset type to start processing from
-   * @return this system descriptor
-   */
-  public SubClass withDefaultStreamOffsetDefault(OffsetType offsetType) {
-    this.defaultStreamOffsetDefaultOptional = Optional.ofNullable(offsetType);
-    return (SubClass) this;
-  }
-
-  /**
-   * Additional system-specific properties for this system.
-   * <p>
-   * These properties are added under the {@code systems.system-name.*} scope.
-   *
-   * @param systemConfigs system-specific properties for this system
-   * @return this system descriptor
-   */
-  public SubClass withSystemConfigs(Map<String, String> systemConfigs) {
-    this.systemConfigs.putAll(systemConfigs);
-    return (SubClass) this;
-  }
-
-  /**
-   * Default properties for any stream obtained using this system descriptor.
-   * <p>
-   * For example, if "systems.kafka-system.default.stream.replication.factor"=2 was configured,
-   * then every Kafka stream created on the kafka-system will have a replication factor of 2
-   * unless the property is explicitly overridden using the stream descriptor.
-   *
-   * @param defaultStreamConfigs default stream properties
-   * @return this system descriptor
-   */
-  public SubClass withDefaultStreamConfigs(Map<String, String> defaultStreamConfigs) {
-    this.defaultStreamConfigs.putAll(defaultStreamConfigs);
-    return (SubClass) this;
-  }
-
-  public String getSystemName() {
-    return this.systemName;
-  }
-
-  public Optional<InputTransformer> getTransformer() {
-    return this.transformerOptional;
-  }
-
-  public Optional<StreamExpander> getExpander() {
-    return this.expanderOptional;
-  }
-
-  private boolean isValidSystemName(String id) {
-    return StringUtils.isNotBlank(id) && SYSTEM_NAME_PATTERN.matcher(id).matches();
-  }
-
-  public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>();
-    this.factoryClassNameOptional.ifPresent(name -> configs.put(String.format(FACTORY_CONFIG_KEY, systemName), name));
-    this.defaultStreamOffsetDefaultOptional.ifPresent(dsod ->
-        configs.put(String.format(DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, systemName), dsod.name().toLowerCase()));
-    this.defaultStreamConfigs.forEach((key, value) ->
-        configs.put(String.format(DEFAULT_STREAM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
-    this.systemConfigs.forEach((key, value) ->
-        configs.put(String.format(SYSTEM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
-    return configs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
deleted file mode 100644
index 5b43fbd..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors.base.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
- * their own {@code InputTransformer} function result types.
- *
- * @param <InputTransformerType> type of the system level {@code InputTransformer} results
- */
-public interface TransformingInputDescriptorProvider<InputTransformerType> {
-
-  /**
-   * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
-   * stream level serde, and the default system level {@code InputTransformer}.
-   * <p>
-   * The type of messages in the stream is the type of messages returned by the default system level
-   * {@code InputTransformer}
-   *
-   * @param streamId id of the input stream
-   * @param serde stream level serde for the input stream
-   * @return an {@link InputDescriptor} for the input stream
-   */
-  InputDescriptor<InputTransformerType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
index 12823cc..d3c1a23 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
@@ -24,12 +24,12 @@ import org.apache.samza.annotation.InterfaceStability;
 
 /**
  * A function that can be closed after its execution.
- *
- * <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
- *
- * <p> Order of finalization: {@link ClosableFunction}s are closed in the reverse topological order of operators in the
- * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any two operators A and B in the graph, if operator B
- * consumes results from operator A, then operator B is guaranteed to be closed before operator A.
+ * <p>
+ * Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
+ * <p>
+ * Order of finalization: {@link ClosableFunction}s are closed in the reverse topological order of operators in the
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor}. For any two operators A and B in the
+ * graph, if operator B consumes results from operator A, then operator B is guaranteed to be closed before operator A.
  *
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index 7f950de..c808876 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -24,10 +24,11 @@ import org.apache.samza.context.Context;
 
 /**
  * A function that can be initialized before execution.
- *
- * <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
- * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any two operators A and B in the graph, if operator B
- * consumes results from operator A, then operator A is guaranteed to be initialized before operator B.
+ * <p>
+ * Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor}. For any two operators A and B in the
+ * graph, if operator B consumes results from operator A, then operator A is guaranteed to be initialized before
+ * operator B.
  *
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
deleted file mode 100644
index 704d187..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import java.io.Serializable;
-import org.apache.samza.system.IncomingMessageEnvelope;
-
-/**
- * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM}
- * which is delivered to the {@code MessageStream}. Called in {@code InputOperatorImpl} when incoming messages
- * from a {@code SystemConsumer} are being delivered to the application.
- * <p>
- * This is provided by default by transforming system descriptor implementations and can not be overridden
- * or set on a per stream level.
- *
- * @param <OM> type of the transformed message
- */
-public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable {
-
-  /**
-   * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message
-   * which is delivered to the {@code MessageStream}.
-   *
-   * @param ime  the {@link IncomingMessageEnvelope} to be transformed
-   * @return  the transformed message
-   */
-  OM apply(IncomingMessageEnvelope ime);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
deleted file mode 100644
index 7bbf601..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import java.io.Serializable;
-import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-
-/**
- * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
- * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamApplicationDescriptor#getInputStream}
- * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an expanding system descriptor.
- * <p>
- * This is provided by default by expanding system descriptor implementations and can not be overridden
- * or set on a per stream level.
- *
- * @param <OM> type of the messages in the resultant {@link MessageStream}
- */
-public interface StreamExpander<OM> extends Serializable {
-
-  /**
-   * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
-   * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
-   * is being used to get an {@link MessageStream} using {@link StreamApplicationDescriptor#getInputStream}.
-   * <p>
-   * Notes for system implementers:
-   * <p>
-   * Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
-   * {@link StreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor
-   * (like this one) again.
-   * <p>
-   * It's the {@link StreamExpander}'s responsibility to propagate any properties, including serde, from the
-   * user-provided {@link InputDescriptor} to the expanded input descriptors.
-   *
-   * @param streamAppDesc the {@link StreamApplicationDescriptor} to register the expanded sub-DAG of operators on
-   * @param inputDescriptor the {@link InputDescriptor} to be expanded
-   * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
-   */
-  MessageStream<OM> apply(StreamApplicationDescriptor streamAppDesc, InputDescriptor inputDescriptor);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java
new file mode 100644
index 0000000..d180e1e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/ExpandingInputDescriptorProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
+ * their own {@code StreamExpander} function result types.
+ *
+ * @param <StreamExpanderType> type of the system level {@code StreamExpander} results
+ */
+public interface ExpandingInputDescriptorProvider<StreamExpanderType> {
+
+  /**
+   * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
+   * stream level serde, and the default system level {@code StreamExpander}
+   * <p>
+   * The type of messages in the stream is the type of messages returned by the default system level
+   * {@code StreamExpander}
+   *
+   * @param streamId id of the input stream
+   * @param serde stream level serde to be propagated to expanded input streams
+   * @return an {@link InputDescriptor} for the input stream
+   */
+  InputDescriptor<StreamExpanderType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
new file mode 100644
index 0000000..6d18afc
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic input stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured
+ * {@link GenericSystemDescriptor}.
+ * <p>
+ * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
+ * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
+ * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public final class GenericInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, GenericInputDescriptor<StreamMessageType>> {
+  GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
+    super(streamId, serde, systemDescriptor, null);
+  }
+
+  @Override
+  public GenericInputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
+    return super.withPhysicalName(physicalName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
new file mode 100644
index 0000000..5fe3a98
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured
+ * {@link GenericSystemDescriptor}.
+ * <p>
+ * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
+ * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
+ * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public final class GenericOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, GenericOutputDescriptor<StreamMessageType>> {
+  GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
+    super(streamId, serde, systemDescriptor);
+  }
+
+  @Override
+  public GenericOutputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
+    return super.withPhysicalName(physicalName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
new file mode 100644
index 0000000..59b2a12
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for a generic system.
+ * <p>
+ * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
+ * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
+ * Additional system specific properties may be provided using {@link #withSystemConfigs}
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ */
+public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+
+  /**
+   * Constructs a {@link GenericSystemDescriptor} instance with no system level serde.
+   * Serdes must be provided explicitly at stream level when getting input or output descriptors.
+   *
+   * @param systemName name of this system
+   * @param factoryClassName name of the SystemFactory class for this system
+   */
+  public GenericSystemDescriptor(String systemName, String factoryClassName) {
+    super(systemName, factoryClassName, null, null);
+  }
+
+  @Override
+  public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericInputDescriptor<>(streamId, this, serde);
+  }
+
+  @Override
+  public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new GenericOutputDescriptor<>(streamId, this, serde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
new file mode 100644
index 0000000..2c0ca88
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata.OffsetType;
+
+/**
+ * The base descriptor for an input stream. Allows setting properties that are common to all input streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class InputDescriptor<StreamMessageType, SubClass extends InputDescriptor<StreamMessageType, SubClass>>
+    extends StreamDescriptor<StreamMessageType, SubClass> {
+  private static final String RESET_OFFSET_CONFIG_KEY = "streams.%s.samza.reset.offset";
+  private static final String OFFSET_DEFAULT_CONFIG_KEY = "streams.%s.samza.offset.default";
+  private static final String PRIORITY_CONFIG_KEY = "streams.%s.samza.priority";
+  private static final String BOOTSTRAP_CONFIG_KEY = "streams.%s.samza.bootstrap";
+  private static final String BOUNDED_CONFIG_KEY = "streams.%s.samza.bounded";
+  private static final String DELETE_COMMITTED_MESSAGES_CONFIG_KEY = "streams.%s.samza.delete.committed.messages";
+
+  private final Optional<InputTransformer> transformerOptional;
+
+  private Optional<Boolean> resetOffsetOptional = Optional.empty();
+  private Optional<OffsetType> offsetDefaultOptional = Optional.empty();
+  private Optional<Integer> priorityOptional = Optional.empty();
+  private Optional<Boolean> isBootstrapOptional = Optional.empty();
+  private Optional<Boolean> isBoundedOptional = Optional.empty();
+  private Optional<Boolean> deleteCommittedMessagesOptional = Optional.empty();
+
+  /**
+   * Constructs an {@link InputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   * @param transformer stream level input stream transform function if available, else null
+   */
+  public InputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor, InputTransformer transformer) {
+    super(streamId, serde, systemDescriptor);
+
+    // stream level transformer takes precedence over system level transformer
+    if (transformer != null) {
+      this.transformerOptional = Optional.of(transformer);
+    } else {
+      this.transformerOptional = systemDescriptor.getTransformer();
+    }
+  }
+
+  /**
+   * If set, when a Samza container starts up, it ignores any checkpointed offset for this particular
+   * input stream. Its behavior is thus determined by the {@link #withOffsetDefault} setting.
+   * Note that the reset takes effect every time a container is started, which may be every time you restart your job,
+   * or more frequently if a container fails and is restarted by the framework.
+   *
+   * @return this input descriptor
+   */
+  public SubClass shouldResetOffset() {
+    this.resetOffsetOptional = Optional.of(true);
+    return (SubClass) this;
+  }
+
+  /**
+   * If a container starts up without a checkpoint, this property determines where in the input stream we should start
+   * consuming. The value must be an OffsetType, one of the following:
+   * <ul>
+   *  <li>upcoming: Start processing messages that are published after the job starts.
+   *                Any messages published while the job was not running are not processed.
+   *  <li>oldest: Start processing at the oldest available message in the system,
+   *              and reprocess the entire available message history.
+   * </ul>
+   * This property is for an individual stream. To set it for all streams within a system, see
+   * {@link SystemDescriptor#withDefaultStreamOffsetDefault}. If both are defined, the stream-level definition
+   * takes precedence.
+   *
+   * @param offsetDefault offset type to start processing from
+   * @return this input descriptor
+   */
+  public SubClass withOffsetDefault(OffsetType offsetDefault) {
+    this.offsetDefaultOptional = Optional.ofNullable(offsetDefault);
+    return (SubClass) this;
+  }
+
+  /**
+   * If one or more streams have a priority set (any positive integer), they will be processed with higher priority
+   * than the other streams.
+   * <p>
+   * You can set several streams to the same priority, or define multiple priority levels by assigning a
+   * higher number to the higher-priority streams.
+   * <p>
+   * If a higher-priority stream has any messages available, they will always be processed first;
+   * messages from lower-priority streams are only processed when there are no new messages on higher-priority inputs.
+   *
+   * @param priority priority for this input stream
+   * @return this input descriptor
+   */
+  public SubClass withPriority(int priority) {
+    this.priorityOptional = Optional.of(priority);
+    return (SubClass) this;
+  }
+
+  /**
+   * If set, this stream will be processed as a bootstrap stream. This means that every time a Samza container
+   * starts up, this stream will be fully consumed before messages from any other stream are processed.
+   *
+   * @return this input descriptor
+   */
+  public SubClass shouldBootstrap() {
+    this.isBootstrapOptional = Optional.of(true);
+    return (SubClass) this;
+  }
+
+  /**
+   * If set, this stream will be considered a bounded stream. If all input streams in an application are
+   * bounded, the job is considered to be running in batch processing mode.
+   *
+   * @return this input descriptor
+   */
+  public SubClass isBounded() {
+    this.isBoundedOptional = Optional.of(true);
+    return (SubClass) this;
+  }
+
+  /**
+   * If set, and supported by the system implementation, messages older than the latest checkpointed offset
+   * for this stream may be deleted after the commit.
+   *
+   * @return this input descriptor
+   */
+  public SubClass shouldDeleteCommittedMessages() {
+    this.deleteCommittedMessagesOptional = Optional.of(true);
+    return (SubClass) this;
+  }
+
+  public Optional<InputTransformer> getTransformer() {
+    return this.transformerOptional;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>(super.toConfig());
+    String streamId = getStreamId();
+    this.offsetDefaultOptional.ifPresent(od ->
+        configs.put(String.format(OFFSET_DEFAULT_CONFIG_KEY, streamId), od.name().toLowerCase()));
+    this.resetOffsetOptional.ifPresent(resetOffset ->
+        configs.put(String.format(RESET_OFFSET_CONFIG_KEY, streamId), Boolean.toString(resetOffset)));
+    this.priorityOptional.ifPresent(priority ->
+        configs.put(String.format(PRIORITY_CONFIG_KEY, streamId), Integer.toString(priority)));
+    this.isBootstrapOptional.ifPresent(bootstrap ->
+        configs.put(String.format(BOOTSTRAP_CONFIG_KEY, streamId), Boolean.toString(bootstrap)));
+    this.isBoundedOptional.ifPresent(bounded ->
+        configs.put(String.format(BOUNDED_CONFIG_KEY, streamId), Boolean.toString(bounded)));
+    this.deleteCommittedMessagesOptional.ifPresent(deleteCommittedMessages ->
+        configs.put(String.format(DELETE_COMMITTED_MESSAGES_CONFIG_KEY, streamId),
+            Boolean.toString(deleteCommittedMessages)));
+    return Collections.unmodifiableMap(configs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java
new file mode 100644
index 0000000..4435f06
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/InputTransformer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import java.io.Serializable;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM}
+ * which is delivered to the {@code MessageStream}. Called in {@code InputOperatorImpl} when incoming messages
+ * from a {@code SystemConsumer} are being delivered to the application.
+ * <p>
+ * This is provided by default by transforming system descriptor implementations and can not be overridden
+ * or set on a per stream level.
+ *
+ * @param <OM> type of the transformed message
+ */
+public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable {
+
+  /**
+   * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message
+   * which is delivered to the {@code MessageStream}.
+   *
+   * @param ime  the {@link IncomingMessageEnvelope} to be transformed
+   * @return  the transformed message
+   */
+  OM apply(IncomingMessageEnvelope ime);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
new file mode 100644
index 0000000..70a5d0f
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * The base descriptor for an output stream. Allows setting properties that are common to all output streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class OutputDescriptor<StreamMessageType, SubClass extends OutputDescriptor<StreamMessageType, SubClass>>
+    extends StreamDescriptor<StreamMessageType, SubClass> {
+  /**
+   * Constructs an {@link OutputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  public OutputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) {
+    super(streamId, serde, systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java
new file mode 100644
index 0000000..2ebfe79
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptorProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * Interface for simple {@code SystemDescriptors} that return {@code OutputDescriptors} parameterized by the type of
+ * the provided stream level serde.
+ */
+public interface OutputDescriptorProvider {
+
+  /**
+   * Gets an {@link OutputDescriptor} representing an output stream on this system that uses the provided
+   * stream specific serde instead of the default system serde.
+   * <p>
+   * An {@code OutputStream<KV<K, V>>}, obtained using a descriptor with a {@code KVSerde<K, V>}, can send messages
+   * of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other {@code Serde<M>} can send messages of
+   * type M without a key.
+   * <p>
+   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used if the {@code SystemProducer}
+   * serializes the outgoing messages itself, and no prior serialization is required from the framework.
+   *
+   * @param streamId id of the output stream
+   * @param serde serde for this output stream that overrides the default system serde, if any.
+   * @param <StreamMessageType> type of messages in the output stream
+   * @return the {@link OutputDescriptor} for the output stream
+   */
+  <StreamMessageType> OutputDescriptor<StreamMessageType, ? extends OutputDescriptor> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java
new file mode 100644
index 0000000..ed1c6a4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/SimpleInputDescriptorProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * Interface for simple {@code SystemDescriptors} that return {@code InputDescriptors} parameterized by the type of
+ * the provided stream level serde.
+ */
+public interface SimpleInputDescriptorProvider {
+
+  /**
+   * Gets an {@link InputDescriptor} for an input stream on this system. The stream has the provided
+   * stream level serde.
+   * <p>
+   * The type of messages in the stream is the type of the provided stream level serde.
+   *
+   * @param streamId id of the input stream
+   * @param serde stream level serde for the input stream
+   * @param <StreamMessageType> type of messages in this stream
+   * @return an {@link InputDescriptor} for the input stream
+   */
+  <StreamMessageType> InputDescriptor<StreamMessageType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde<StreamMessageType> serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
new file mode 100644
index 0000000..d2b25f9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class StreamDescriptor<StreamMessageType, SubClass extends StreamDescriptor<StreamMessageType, SubClass>> {
+  private static final String SYSTEM_CONFIG_KEY = "streams.%s.samza.system";
+  private static final String PHYSICAL_NAME_CONFIG_KEY = "streams.%s.samza.physical.name";
+  private static final String STREAM_CONFIGS_CONFIG_KEY = "streams.%s.%s";
+  private static final Pattern STREAM_ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+  private final String streamId;
+  private final Serde serde;
+  private final SystemDescriptor systemDescriptor;
+
+  private final Map<String, String> streamConfigs = new HashMap<>();
+  private Optional<String> physicalNameOptional = Optional.empty();
+
+  /**
+   * Constructs a {@link StreamDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  StreamDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) {
+    Preconditions.checkArgument(systemDescriptor != null,
+        String.format("SystemDescriptor must not be null. streamId: %s", streamId));
+    String systemName = systemDescriptor.getSystemName();
+    Preconditions.checkState(isValidStreamId(streamId),
+        String.format("streamId must be non-empty and must not contain spaces or special characters. " +
+            "streamId: %s, systemName: %s", streamId, systemName));
+    Preconditions.checkArgument(serde != null,
+        String.format("Serde must not be null. streamId: %s systemName: %s", streamId, systemName));
+    this.streamId = streamId;
+    this.serde = serde;
+    this.systemDescriptor = systemDescriptor;
+  }
+
+  /**
+   * The physical name of the stream on the system on which this stream will be accessed.
+   * This is opposed to the {@code streamId} which is the logical name that Samza uses to identify the stream.
+   * <p>
+   * A physical name could be a Kafka topic name, an HDFS file URN, or any other system-specific identifier.
+   * <p>
+   * If not provided, the logical {@code streamId} is used as the physical name.
+   *
+   * @param physicalName physical name for this stream.
+   * @return this stream descriptor.
+   */
+  protected SubClass withPhysicalName(String physicalName) {
+    this.physicalNameOptional = Optional.ofNullable(physicalName);
+    return (SubClass) this;
+  }
+
+  /**
+   * Additional system-specific properties for this stream.
+   * <p>
+   * These properties are added under the {@code streams.stream-id.*} scope.
+   *
+   * @param streamConfigs system-specific properties for this stream
+   * @return this stream descriptor
+   */
+  public SubClass withStreamConfigs(Map<String, String> streamConfigs) {
+    this.streamConfigs.putAll(streamConfigs);
+    return (SubClass) this;
+  }
+
+  public String getStreamId() {
+    return this.streamId;
+  }
+
+  public String getSystemName() {
+    return this.systemDescriptor.getSystemName();
+  }
+
+  public Serde getSerde() {
+    return this.serde;
+  }
+
+  public SystemDescriptor getSystemDescriptor() {
+    return this.systemDescriptor;
+  }
+
+  public Optional<String> getPhysicalName() {
+    return physicalNameOptional;
+  }
+
+  private boolean isValidStreamId(String id) {
+    return StringUtils.isNotBlank(id) && STREAM_ID_PATTERN.matcher(id).matches();
+  }
+
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(String.format(SYSTEM_CONFIG_KEY, streamId), getSystemName());
+    this.physicalNameOptional.ifPresent(physicalName ->
+        configs.put(String.format(PHYSICAL_NAME_CONFIG_KEY, streamId), physicalName));
+    this.streamConfigs.forEach((key, value) ->
+        configs.put(String.format(STREAM_CONFIGS_CONFIG_KEY, streamId, key), value));
+    return Collections.unmodifiableMap(configs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java
new file mode 100644
index 0000000..4b8e6f0
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/StreamExpander.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import java.io.Serializable;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.MessageStream;
+
+/**
+ * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
+ * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamApplicationDescriptor#getInputStream}
+ * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an expanding system descriptor.
+ * <p>
+ * This is provided by default by expanding system descriptor implementations and can not be overridden
+ * or set on a per stream level.
+ *
+ * @param <OM> type of the messages in the resultant {@link MessageStream}
+ */
+public interface StreamExpander<OM> extends Serializable {
+
+  /**
+   * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
+   * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
+   * is being used to get an {@link MessageStream} using {@link StreamApplicationDescriptor#getInputStream}.
+   * <p>
+   * Notes for system implementers:
+   * <p>
+   * Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
+   * {@link StreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor
+   * (like this one) again.
+   * <p>
+   * It's the {@link StreamExpander}'s responsibility to propagate any properties, including serde, from the
+   * user-provided {@link InputDescriptor} to the expanded input descriptors.
+   *
+   * @param streamAppDesc the {@link StreamApplicationDescriptor} to register the expanded sub-DAG of operators on
+   * @param inputDescriptor the {@link InputDescriptor} to be expanded
+   * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
+   */
+  MessageStream<OM> apply(StreamApplicationDescriptor streamAppDesc, InputDescriptor inputDescriptor);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
new file mode 100644
index 0000000..4b93a32
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.system.SystemStreamMetadata.OffsetType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base descriptor for a system. Allows setting properties that are common to all systems.
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * <p>
+ * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
+ * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
+ * to another message that is delivered to the {@code MessageStream}. It is applied at runtime in
+ * {@code InputOperatorImpl}.
+ * <p>
+ * Systems may provide a {@link StreamExpander} to be used for input streams on the system. A {@link StreamExpander}
+ * expands the provided {@code InputDescriptor} to a sub-DAG of one or more operators on the {@code StreamGraph},
+ * and returns a new {@code MessageStream} with the combined results. It is called during graph description
+ * in {@code StreamGraph#getInputStream}.
+ * <p>
+ * Systems that support consuming messages from a stream should provide users means of obtaining an
+ * {@code InputDescriptor}. Recommended interfaces for doing so are {@link TransformingInputDescriptorProvider} for
+ * systems that support system level {@link InputTransformer}, {@link ExpandingInputDescriptorProvider} for systems
+ * that support system level {@link StreamExpander} functions, and {@link SimpleInputDescriptorProvider} otherwise.
+ * <p>
+ * Systems that support producing messages to a stream should provide users means of obtaining an
+ * {@code OutputDescriptor}. Recommended interface for doing so is {@link OutputDescriptorProvider}.
+ * <p>
+ * It is not required for SystemDescriptors to implement one of the Provider interfaces above. System implementers
+ * may choose to expose additional or alternate APIs for obtaining Input/Output Descriptors by extending
+ * SystemDescriptor directly.
+ *
+ * @param <SubClass> type of the concrete sub-class
+ */
+public abstract class SystemDescriptor<SubClass extends SystemDescriptor<SubClass>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SystemDescriptor.class);
+  private static final String FACTORY_CONFIG_KEY = "systems.%s.samza.factory";
+  private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default";
+  private static final String DEFAULT_STREAM_CONFIGS_CONFIG_KEY = "systems.%s.default.stream.%s";
+  private static final String SYSTEM_CONFIGS_CONFIG_KEY = "systems.%s.%s";
+  private static final Pattern SYSTEM_NAME_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+  private final String systemName;
+  private final Optional<String> factoryClassNameOptional;
+  private final Optional<InputTransformer> transformerOptional;
+  private final Optional<StreamExpander> expanderOptional;
+
+  private final Map<String, String> systemConfigs = new HashMap<>();
+  private final Map<String, String> defaultStreamConfigs = new HashMap<>();
+  private Optional<OffsetType> defaultStreamOffsetDefaultOptional = Optional.empty();
+
+  /**
+   * Constructs a {@link SystemDescriptor} instance.
+   *
+   * @param systemName name of this system
+   * @param factoryClassName name of the SystemFactory class for this system
+   * @param transformer the {@link InputTransformer} for the system if any, else null
+   * @param expander the {@link StreamExpander} for the system if any, else null
+   */
+  public SystemDescriptor(String systemName, String factoryClassName, InputTransformer transformer, StreamExpander expander) {
+    Preconditions.checkArgument(isValidSystemName(systemName),
+        String.format("systemName: %s must be non-empty and must not contain spaces or special characters.", systemName));
+    if (StringUtils.isBlank(factoryClassName)) {
+      LOGGER.warn("Blank SystemFactory class name for system: {}. A value must be provided in configuration using {}.",
+          systemName, String.format(FACTORY_CONFIG_KEY, systemName));
+    }
+    this.systemName = systemName;
+    this.factoryClassNameOptional = Optional.ofNullable(StringUtils.stripToNull(factoryClassName));
+    this.transformerOptional = Optional.ofNullable(transformer);
+    this.expanderOptional = Optional.ofNullable(expander);
+  }
+
+  /**
+   * If a container starts up without a checkpoint, this property determines where in the input stream we should start
+   * consuming. The value must be an {@link OffsetType}, one of the following:
+   * <ul>
+   *  <li>upcoming: Start processing messages that are published after the job starts.
+   *                Any messages published while the job was not running are not processed.
+   *  <li>oldest: Start processing at the oldest available message in the system,
+   *              and reprocess the entire available message history.
+   * </ul>
+   * This property is for all streams obtained using this system descriptor. To set it for an individual stream,
+   * see {@link InputDescriptor#withOffsetDefault}.
+   * If both are defined, the stream-level definition takes precedence.
+   *
+   * @param offsetType offset type to start processing from
+   * @return this system descriptor
+   */
+  public SubClass withDefaultStreamOffsetDefault(OffsetType offsetType) {
+    this.defaultStreamOffsetDefaultOptional = Optional.ofNullable(offsetType);
+    return (SubClass) this;
+  }
+
+  /**
+   * Additional system-specific properties for this system.
+   * <p>
+   * These properties are added under the {@code systems.system-name.*} scope.
+   *
+   * @param systemConfigs system-specific properties for this system
+   * @return this system descriptor
+   */
+  public SubClass withSystemConfigs(Map<String, String> systemConfigs) {
+    this.systemConfigs.putAll(systemConfigs);
+    return (SubClass) this;
+  }
+
+  /**
+   * Default properties for any stream obtained using this system descriptor.
+   * <p>
+   * For example, if "systems.kafka-system.default.stream.replication.factor"=2 was configured,
+   * then every Kafka stream created on the kafka-system will have a replication factor of 2
+   * unless the property is explicitly overridden using the stream descriptor.
+   *
+   * @param defaultStreamConfigs default stream properties
+   * @return this system descriptor
+   */
+  public SubClass withDefaultStreamConfigs(Map<String, String> defaultStreamConfigs) {
+    this.defaultStreamConfigs.putAll(defaultStreamConfigs);
+    return (SubClass) this;
+  }
+
+  public String getSystemName() {
+    return this.systemName;
+  }
+
+  public Optional<InputTransformer> getTransformer() {
+    return this.transformerOptional;
+  }
+
+  public Optional<StreamExpander> getExpander() {
+    return this.expanderOptional;
+  }
+
+  private boolean isValidSystemName(String id) {
+    return StringUtils.isNotBlank(id) && SYSTEM_NAME_PATTERN.matcher(id).matches();
+  }
+
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>();
+    this.factoryClassNameOptional.ifPresent(name -> configs.put(String.format(FACTORY_CONFIG_KEY, systemName), name));
+    this.defaultStreamOffsetDefaultOptional.ifPresent(dsod ->
+        configs.put(String.format(DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, systemName), dsod.name().toLowerCase()));
+    this.defaultStreamConfigs.forEach((key, value) ->
+        configs.put(String.format(DEFAULT_STREAM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
+    this.systemConfigs.forEach((key, value) ->
+        configs.put(String.format(SYSTEM_CONFIGS_CONFIG_KEY, getSystemName(), key), value));
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java
new file mode 100644
index 0000000..e626efd
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/descriptors/TransformingInputDescriptorProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.descriptors;
+
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to
+ * their own {@code InputTransformer} function result types.
+ *
+ * @param <InputTransformerType> type of the system level {@code InputTransformer} results
+ */
+public interface TransformingInputDescriptorProvider<InputTransformerType> {
+
+  /**
+   * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided
+   * stream level serde, and the default system level {@code InputTransformer}.
+   * <p>
+   * The type of messages in the stream is the type of messages returned by the default system level
+   * {@code InputTransformer}
+   *
+   * @param streamId id of the input stream
+   * @param serde stream level serde for the input stream
+   * @return an {@link InputDescriptor} for the input stream
+   */
+  InputDescriptor<InputTransformerType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
index 7b8754a..296edf4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
deleted file mode 100644
index 350324c..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-
-/**
- * A table provider provides the implementation for a table. It ensures a table is
- * properly constructed and also manages its lifecycle.
- */
-@InterfaceStability.Unstable
-public interface TableProvider {
-  /**
-   * Initialize TableProvider with container and task context
-   * @param context context for the task
-   */
-  void init(Context context);
-
-  /**
-   * Get an instance of the table for read/write operations
-   * @return the underlying table
-   */
-  Table getTable();
-
-  /**
-   * Generate any configuration for this table, the generated configuration
-   * is used by Samza container to construct this table and any components
-   * necessary. Instead of manipulating the input parameters, this method
-   * should return the generated configuration.
-   *
-   * @param jobConfig the job config
-   * @param generatedConfig config generated by earlier processing, but has
-   *                        not yet been merged to job config
-   * @return configuration for this table
-   */
-  Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
-
-  /**
-   * Shutdown the underlying table
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
deleted file mode 100644
index 1bb0196..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * Factory of a table provider object
- */
-@InterfaceStability.Unstable
-public interface TableProviderFactory {
-  /**
-   * Constructs an instances of the table provider based on a given table spec
-   * @param tableSpec the table spec
-   * @return the table provider
-   */
-  TableProvider getTableProvider(TableSpec tableSpec);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
index 82577c4..2883a93 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -32,8 +32,9 @@ import org.apache.samza.storage.SideInputsProcessor;
 /**
  * TableSpec is a blueprint for creating, validating, or simply describing a table in the runtime environment.
  *
- * It is typically created indirectly by constructing an instance of {@link org.apache.samza.operators.TableDescriptor},
- * and then invoke <code>BaseTableDescriptor.getTableSpec()</code>.
+ * It is typically created indirectly by constructing an instance of
+ * {@link org.apache.samza.table.descriptors.TableDescriptor}, and then invoke
+ * <code>BaseTableDescriptor.getTableSpec()</code>.
  *
  * It has specific attributes for common behaviors that Samza uses.
  *

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
new file mode 100644
index 0000000..5d7b89e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * User facing class to collect metadata that fully describes a
+ * Samza table. This interface should be implemented by concrete table implementations.
+ * <p>
+ * Typical user code should look like the following, notice <code>withConfig()</code>
+ * is defined in this class and the rest in subclasses.
+ *
+ * <pre>
+ * {@code
+ * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
+ *         KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
+ *     .withBlockSize(1024)
+ *     .withConfig("some-key", "some-value");
+ * }
+ * </pre>
+
+ * Once constructed, a table descriptor can be registered with the system. Internally,
+ * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec},
+ * which is used to track tables internally.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+@InterfaceStability.Unstable
+public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
+
+  /**
+   * Get the Id of the table
+   * @return Id of the table
+   */
+  String getTableId();
+
+  /**
+   * Add a configuration entry for the table
+   * @param key the key
+   * @param value the value
+   * @return this table descriptor instance
+   */
+  D withConfig(String key, String value);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
new file mode 100644
index 0000000..8640b2a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.Table;
+
+/**
+ * A table provider provides the implementation for a table. It ensures a table is
+ * properly constructed and also manages its lifecycle.
+ */
+@InterfaceStability.Unstable
+public interface TableProvider {
+  /**
+   * Initialize TableProvider with container and task context
+   * @param context context for the task
+   */
+  void init(Context context);
+
+  /**
+   * Get an instance of the table for read/write operations
+   * @return the underlying table
+   */
+  Table getTable();
+
+  /**
+   * Generate any configuration for this table, the generated configuration
+   * is used by Samza container to construct this table and any components
+   * necessary. Instead of manipulating the input parameters, this method
+   * should return the generated configuration.
+   *
+   * @param jobConfig the job config
+   * @param generatedConfig config generated by earlier processing, but has
+   *                        not yet been merged to job config
+   * @return configuration for this table
+   */
+  Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig);
+
+  /**
+   * Shutdown the underlying table
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
new file mode 100644
index 0000000..2f9a607
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableProviderFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.descriptors;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Factory of a table provider object
+ */
+@InterfaceStability.Unstable
+public interface TableProviderFactory {
+  /**
+   * Constructs an instances of the table provider based on a given table spec
+   * @param tableSpec the table spec
+   * @return the table provider
+   */
+  TableProvider getTableProvider(TableSpec tableSpec);
+}