You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:42 UTC

[02/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
deleted file mode 100644
index 399f9fd..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-import junit.framework.Assert;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Test;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestBaseLocalStoreBackedTableProvider {
-
-  @Test
-  public void testInit() {
-    Context context = mock(Context.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
-    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
-    when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
-
-    TableSpec tableSpec = mock(TableSpec.class);
-    when(tableSpec.getId()).thenReturn("t1");
-
-    TableProvider tableProvider = createTableProvider(tableSpec);
-    tableProvider.init(context);
-    Assert.assertNotNull(tableProvider.getTable());
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testInitFail() {
-    TableSpec tableSpec = mock(TableSpec.class);
-    when(tableSpec.getId()).thenReturn("t1");
-    TableProvider tableProvider = createTableProvider(tableSpec);
-    Assert.assertNotNull(tableProvider.getTable());
-  }
-
-  @Test
-  public void testGenerateCommonStoreConfig() {
-    Map<String, String> generatedConfig = new HashMap<>();
-    generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
-    generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
-
-    TableSpec tableSpec = mock(TableSpec.class);
-    when(tableSpec.getId()).thenReturn("t1");
-
-    TableProvider tableProvider = createTableProvider(tableSpec);
-    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
-    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
-    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
-  }
-
-  @Test
-  public void testChangelogDisabled() {
-    TableSpec tableSpec = createTableDescriptor("t1")
-        .getTableSpec();
-
-    TableProvider tableProvider = createTableProvider(tableSpec);
-    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
-    Assert.assertEquals(2, tableConfig.size());
-    Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), "t1")));
-  }
-
-  @Test
-  public void testChangelogEnabled() {
-    TableSpec tableSpec = createTableDescriptor("t1")
-        .withChangelogEnabled()
-        .getTableSpec();
-
-    Map<String, String> jobConfig = new HashMap<>();
-    jobConfig.put(JobConfig.JOB_NAME(), "test-job");
-    jobConfig.put(JobConfig.JOB_ID(), "10");
-
-    TableProvider tableProvider = createTableProvider(tableSpec);
-    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(jobConfig), new MapConfig());
-    Assert.assertEquals(3, tableConfig.size());
-    Assert.assertEquals("test-job-10-table-t1", String.format(
-        tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
-  }
-
-  @Test
-  public void testChangelogEnabledWithCustomParameters() {
-    TableSpec tableSpec = createTableDescriptor("t1")
-        .withChangelogStream("my-stream")
-        .withChangelogReplicationFactor(100)
-        .getTableSpec();
-
-    TableProvider tableProvider = createTableProvider(tableSpec);
-    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
-    Assert.assertEquals(4, tableConfig.size());
-    Assert.assertEquals("my-stream", String.format(
-        tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
-    Assert.assertEquals("100", String.format(
-        tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), "t1"))));
-  }
-
-  private TableProvider createTableProvider(TableSpec tableSpec) {
-    return new BaseLocalStoreBackedTableProvider(tableSpec) {
-      @Override
-      public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
-        return generateCommonStoreConfig(jobConfig, generatedConfig);
-      }
-    };
-  }
-
-  private BaseLocalStoreBackedTableDescriptor createTableDescriptor(String tableId) {
-    return new BaseLocalStoreBackedTableDescriptor(tableId) {
-      @Override
-      public TableSpec getTableSpec() {
-        validate();
-        Map<String, String> tableSpecConfig = new HashMap<>();
-        generateTableSpecConfig(tableSpecConfig);
-        return new TableSpec(tableId, serde, null, tableSpecConfig,
-            sideInputs, sideInputsProcessor);
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java
new file mode 100644
index 0000000..559f2e9
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestBaseLocalStoreBackedTableProvider {
+
+  @Test
+  public void testInit() {
+    Context context = mock(Context.class);
+    TaskContext taskContext = mock(TaskContext.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
+    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+    when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn("t1");
+
+    TableProvider tableProvider = createTableProvider(tableSpec);
+    tableProvider.init(context);
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInitFail() {
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn("t1");
+    TableProvider tableProvider = createTableProvider(tableSpec);
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test
+  public void testGenerateCommonStoreConfig() {
+    Map<String, String> generatedConfig = new HashMap<>();
+    generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn("t1");
+
+    TableProvider tableProvider = createTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
+    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+  }
+
+  @Test
+  public void testChangelogDisabled() {
+    TableSpec tableSpec = createTableDescriptor("t1")
+        .getTableSpec();
+
+    TableProvider tableProvider = createTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
+    Assert.assertEquals(2, tableConfig.size());
+    Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), "t1")));
+  }
+
+  @Test
+  public void testChangelogEnabled() {
+    TableSpec tableSpec = createTableDescriptor("t1")
+        .withChangelogEnabled()
+        .getTableSpec();
+
+    Map<String, String> jobConfig = new HashMap<>();
+    jobConfig.put(JobConfig.JOB_NAME(), "test-job");
+    jobConfig.put(JobConfig.JOB_ID(), "10");
+
+    TableProvider tableProvider = createTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(jobConfig), new MapConfig());
+    Assert.assertEquals(3, tableConfig.size());
+    Assert.assertEquals("test-job-10-table-t1", String.format(
+        tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
+  }
+
+  @Test
+  public void testChangelogEnabledWithCustomParameters() {
+    TableSpec tableSpec = createTableDescriptor("t1")
+        .withChangelogStream("my-stream")
+        .withChangelogReplicationFactor(100)
+        .getTableSpec();
+
+    TableProvider tableProvider = createTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
+    Assert.assertEquals(4, tableConfig.size());
+    Assert.assertEquals("my-stream", String.format(
+        tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
+    Assert.assertEquals("100", String.format(
+        tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), "t1"))));
+  }
+
+  private TableProvider createTableProvider(TableSpec tableSpec) {
+    return new BaseLocalStoreBackedTableProvider(tableSpec) {
+      @Override
+      public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+        return generateCommonStoreConfig(jobConfig, generatedConfig);
+      }
+    };
+  }
+
+  private BaseLocalStoreBackedTableDescriptor createTableDescriptor(String tableId) {
+    return new BaseLocalStoreBackedTableDescriptor(tableId) {
+      @Override
+      public TableSpec getTableSpec() {
+        validate();
+        Map<String, String> tableSpecConfig = new HashMap<>();
+        generateTableSpecConfig(tableSpecConfig);
+        return new TableSpec(tableId, serde, null, tableSpecConfig,
+            sideInputs, sideInputsProcessor);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index a1c1bdd..7faff17 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,7 +21,7 @@ package org.apache.samza.sql.impl;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
@@ -29,7 +29,7 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 3a73e09..8636736 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.SystemStream;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index fd1a2a8..1caefe6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.translator.QueryTranslator;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
index 435a2cc..94b2296 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -25,14 +25,14 @@ import java.util.Optional;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 817f145..c9365cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -31,14 +31,11 @@ import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
@@ -50,7 +47,10 @@ import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index be94160..cc765bd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -23,12 +23,12 @@ import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index a7ab663..98cc92e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -32,9 +32,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 4c78b5a..14314c8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -26,8 +26,8 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -36,13 +36,13 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
 import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.BaseTableProvider;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
 
 
 public class TestIOResolverFactory implements SqlIOResolverFactory {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index 07ebe33..b9b0c96 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index f0a8a89..dcd7023 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -33,10 +33,10 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -49,7 +49,7 @@ import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 2ed7a00..68db1e4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -31,7 +31,7 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.calcite.util.Pair;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 7a194db..cd81e0d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
index a74993f..4943504 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
@@ -22,14 +22,13 @@ package org.apache.samza.sql.translator;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.TableImpl;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.storage.kv.RocksDbTableProvider;
-import org.apache.samza.table.Table;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableProvider;
 import org.apache.samza.table.TableSpec;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
index 7d5e0d2..ba9c8b3 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -21,7 +21,7 @@ package org.apache.samza.example;
 import java.time.Duration;
 import java.util.HashMap;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.triggers.Triggers;
@@ -33,9 +33,9 @@ import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
index 4ef2402..7721d44 100644
--- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -20,7 +20,7 @@
 package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -29,9 +29,9 @@ import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index e991c4e..4923b7d 100644
--- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
@@ -36,9 +36,9 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
index fe018f3..ac0db36 100644
--- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -22,7 +22,7 @@ package org.apache.samza.example;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.runtime.ApplicationRunner;
@@ -31,9 +31,9 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 public class MergeExample implements StreamApplication {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 8d3812b..ea38984 100644
--- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,7 +20,7 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -29,9 +29,9 @@ import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index e2ebc93..1476c81 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -19,7 +19,7 @@
 package org.apache.samza.example;
 
 import java.time.Duration;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -36,9 +36,9 @@ import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
index 8a0ca28..2cf3ac3 100644
--- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -20,7 +20,7 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -32,9 +32,9 @@ import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
index 73dc10a..8f6c6f8 100644
--- a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
@@ -18,18 +18,18 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
index 2f4c19c..51089f7 100644
--- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -21,7 +21,7 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -34,9 +34,9 @@ import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.util.CommandLine;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index 42379f3..b47bf0a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -24,7 +24,7 @@ import java.time.Duration;
 import java.util.stream.Collectors;
 import java.util.List;
 import java.util.Map;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 
@@ -32,8 +32,8 @@ import static org.junit.Assert.assertThat;
 
 
 /**
- * Assertion utils on the content of a stream described by
- * {@link org.apache.samza.operators.descriptors.base.stream.StreamDescriptor}.
+ * Assertion utils on the content of a stream described by a
+ * {@link org.apache.samza.system.descriptors.StreamDescriptor}
  */
 public class StreamAssert {
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 5cd47de..ba7128a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -58,9 +58,9 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.task.AsyncStreamTask;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.util.FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
deleted file mode 100644
index 6065bf0..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.NoOpSerde;
-
-/**
- * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
- * <p>
- *  An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
- * <p>
- * @param <StreamMessageType> type of messages in input stream
- */
-public class InMemoryInputDescriptor<StreamMessageType>
-    extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> {
-  /**
-   * Constructs a new InMemoryInputDescriptor from specified components.
-   * @param systemDescriptor name of the system stream is associated with
-   * @param streamId name of the stream
-   */
-  InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) {
-    super(streamId, new NoOpSerde<>(), systemDescriptor, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
deleted file mode 100644
index 75fe7ae..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.NoOpSerde;
-
-/**
- * A descriptor for an in memory output stream.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- */
-public class InMemoryOutputDescriptor<StreamMessageType>
-    extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> {
-
-  /**
-   * Constructs an {@link OutputDescriptor} instance.
-   * @param streamId id of the stream
-   * @param systemDescriptor system descriptor this stream descriptor was obtained from
-   */
-  InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
-    super(streamId, new NoOpSerde<>(), systemDescriptor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
deleted file mode 100644
index 77948f6..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.InMemorySystemConfig;
-import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.inmemory.InMemorySystemFactory;
-import org.apache.samza.config.JavaSystemConfig;
-
-/**
- * A descriptor for InMemorySystem.
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- * <p>
- * Following system level configs are set by default
- * <ol>
- *   <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- *   <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- *   <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li>
- * </ol>
- */
-public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
-    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
-  private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
-  /**
-   * <p>
-   * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
-   * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
-   * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
-   * scope have the highest precedence.
-   *
-   * For this case, it generates following overridden configs
-   * <ol>
-   *      <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
-   *      <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
-   * </ol>
-   *
-   **/
-  private String inMemoryScope;
-
-  /**
-   * Constructs a new InMemorySystemDescriptor from specified components.
-   * <p>
-   * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and
-   * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
-   * <p>
-   * @param systemName unique name of the system
-   */
-  public InMemorySystemDescriptor(String systemName) {
-    super(systemName, FACTORY_CLASS_NAME, null, null);
-    this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
-  }
-
-  @Override
-  public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new InMemoryInputDescriptor<StreamMessageType>(streamId, this);
-  }
-
-  @Override
-  public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor(
-      String streamId, Serde<StreamMessageType> serde) {
-    return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this);
-  }
-
-  /**
-   * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses
-   * This method is framework use only, users are not supposed to use it
-   *
-   * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem
-   * @return this system descriptor
-   */
-  public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) {
-    this.inMemoryScope = inMemoryScope;
-    return this;
-  }
-
-  @Override
-  public Map<String, String> toConfig() {
-    HashMap<String, String> configs = new HashMap<>(super.toConfig());
-    configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
-    configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
-    return configs;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
new file mode 100644
index 0000000..0e49550
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system.descriptors;
+
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
+ * <p>
+ *  An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * @param <StreamMessageType> type of messages in input stream
+ */
+public class InMemoryInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> {
+  /**
+   * Constructs a new InMemoryInputDescriptor from specified components.
+   * @param systemDescriptor name of the system stream is associated with
+   * @param streamId name of the stream
+   */
+  InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) {
+    super(streamId, new NoOpSerde<>(), systemDescriptor, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java
new file mode 100644
index 0000000..26c64f3
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system.descriptors;
+
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public class InMemoryOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> {
+
+  /**
+   * Constructs an {@link OutputDescriptor} instance.
+   * @param streamId id of the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+    super(streamId, new NoOpSerde<>(), systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
new file mode 100644
index 0000000..96a8aca
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.system.descriptors.OutputDescriptorProvider;
+import org.apache.samza.system.descriptors.SimpleInputDescriptorProvider;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.config.JavaSystemConfig;
+
+/**
+ * A descriptor for InMemorySystem.
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * <p>
+ * Following system level configs are set by default
+ * <ol>
+ *   <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ *   <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ *   <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li>
+ * </ol>
+ */
+public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+  private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
+  /**
+   * <p>
+   * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
+   * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
+   * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
+   * scope have the highest precedence.
+   *
+   * For this case, it generates following overridden configs
+   * <ol>
+   *      <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+   *      <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+   * </ol>
+   *
+   **/
+  private String inMemoryScope;
+
+  /**
+   * Constructs a new InMemorySystemDescriptor from specified components.
+   * <p>
+   * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and
+   * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
+   * <p>
+   * @param systemName unique name of the system
+   */
+  public InMemorySystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+    this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
+  }
+
+  @Override
+  public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new InMemoryInputDescriptor<StreamMessageType>(streamId, this);
+  }
+
+  @Override
+  public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this);
+  }
+
+  /**
+   * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses
+   * This method is framework use only, users are not supposed to use it
+   *
+   * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem
+   * @return this system descriptor
+   */
+  public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) {
+    this.inMemoryScope = inMemoryScope;
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>(super.toConfig());
+    configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
+    configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
+    return configs;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index 1954cc3..2e51f6a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -19,13 +19,13 @@
 package org.apache.samza.test.integration;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 4c8884d..672837b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -24,16 +24,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.KVSerde;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index e0097bd..ba62691 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -40,10 +40,10 @@ import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index 7696b62..e326f91 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -29,9 +29,9 @@ import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
index 4caf266..ef17a22 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
@@ -21,12 +21,12 @@ package org.apache.samza.test.framework;
 
 import java.util.Arrays;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 
 public class BroadcastAssertApp implements StreamApplication {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
index 7deb4d7..649c032 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -31,8 +31,8 @@ import org.apache.samza.config.ZkConfig;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.task.ClosableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 6188381..a442140 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -33,22 +33,23 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.table.Table;
 import org.apache.samza.test.controlmessages.TestData;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.table.PageViewToProfileJoinFunction;
 import org.apache.samza.test.table.TestTableData;
 import org.junit.Assert;
 import org.junit.Test;
-import static org.apache.samza.test.controlmessages.TestData.*;
+
+import static org.apache.samza.test.controlmessages.TestData.PageView;
 
 public class StreamApplicationIntegrationTest {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 003b200..aa9e107 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -28,35 +28,36 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.TaskApplicationDescriptor;
 import org.apache.samza.context.Context;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.table.TestTableData;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
 import org.junit.Test;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.PageView;
+
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
 
 
 public class StreamTaskIntegrationTest {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
index 1644a0f..20f18ee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
@@ -25,14 +25,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 
 public class TestSchedulingApp implements StreamApplication {

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index c63c11f..dda31ea 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -23,7 +23,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -35,8 +35,8 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.operator.data.AdClick;
 import org.apache.samza.test.operator.data.PageView;