You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:42 UTC
[02/12] samza git commit: Consolidating package names for System,
Stream, Application and Table descriptors.
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
deleted file mode 100644
index 399f9fd..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-import junit.framework.Assert;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Test;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestBaseLocalStoreBackedTableProvider {
-
- @Test
- public void testInit() {
- Context context = mock(Context.class);
- TaskContext taskContext = mock(TaskContext.class);
- when(context.getTaskContext()).thenReturn(taskContext);
- when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
- when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
-
- TableSpec tableSpec = mock(TableSpec.class);
- when(tableSpec.getId()).thenReturn("t1");
-
- TableProvider tableProvider = createTableProvider(tableSpec);
- tableProvider.init(context);
- Assert.assertNotNull(tableProvider.getTable());
- }
-
- @Test(expected = SamzaException.class)
- public void testInitFail() {
- TableSpec tableSpec = mock(TableSpec.class);
- when(tableSpec.getId()).thenReturn("t1");
- TableProvider tableProvider = createTableProvider(tableSpec);
- Assert.assertNotNull(tableProvider.getTable());
- }
-
- @Test
- public void testGenerateCommonStoreConfig() {
- Map<String, String> generatedConfig = new HashMap<>();
- generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
- generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
-
- TableSpec tableSpec = mock(TableSpec.class);
- when(tableSpec.getId()).thenReturn("t1");
-
- TableProvider tableProvider = createTableProvider(tableSpec);
- Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
- Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
- Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
- }
-
- @Test
- public void testChangelogDisabled() {
- TableSpec tableSpec = createTableDescriptor("t1")
- .getTableSpec();
-
- TableProvider tableProvider = createTableProvider(tableSpec);
- Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
- Assert.assertEquals(2, tableConfig.size());
- Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), "t1")));
- }
-
- @Test
- public void testChangelogEnabled() {
- TableSpec tableSpec = createTableDescriptor("t1")
- .withChangelogEnabled()
- .getTableSpec();
-
- Map<String, String> jobConfig = new HashMap<>();
- jobConfig.put(JobConfig.JOB_NAME(), "test-job");
- jobConfig.put(JobConfig.JOB_ID(), "10");
-
- TableProvider tableProvider = createTableProvider(tableSpec);
- Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(jobConfig), new MapConfig());
- Assert.assertEquals(3, tableConfig.size());
- Assert.assertEquals("test-job-10-table-t1", String.format(
- tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
- }
-
- @Test
- public void testChangelogEnabledWithCustomParameters() {
- TableSpec tableSpec = createTableDescriptor("t1")
- .withChangelogStream("my-stream")
- .withChangelogReplicationFactor(100)
- .getTableSpec();
-
- TableProvider tableProvider = createTableProvider(tableSpec);
- Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
- Assert.assertEquals(4, tableConfig.size());
- Assert.assertEquals("my-stream", String.format(
- tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
- Assert.assertEquals("100", String.format(
- tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), "t1"))));
- }
-
- private TableProvider createTableProvider(TableSpec tableSpec) {
- return new BaseLocalStoreBackedTableProvider(tableSpec) {
- @Override
- public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
- return generateCommonStoreConfig(jobConfig, generatedConfig);
- }
- };
- }
-
- private BaseLocalStoreBackedTableDescriptor createTableDescriptor(String tableId) {
- return new BaseLocalStoreBackedTableDescriptor(tableId) {
- @Override
- public TableSpec getTableSpec() {
- validate();
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
- return new TableSpec(tableId, serde, null, tableSpecConfig,
- sideInputs, sideInputsProcessor);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java
new file mode 100644
index 0000000..559f2e9
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestBaseLocalStoreBackedTableProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestBaseLocalStoreBackedTableProvider {
+
+ @Test
+ public void testInit() {
+ Context context = mock(Context.class);
+ TaskContext taskContext = mock(TaskContext.class);
+ when(context.getTaskContext()).thenReturn(taskContext);
+ when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+ when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+
+ TableSpec tableSpec = mock(TableSpec.class);
+ when(tableSpec.getId()).thenReturn("t1");
+
+ TableProvider tableProvider = createTableProvider(tableSpec);
+ tableProvider.init(context);
+ Assert.assertNotNull(tableProvider.getTable());
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testInitFail() {
+ TableSpec tableSpec = mock(TableSpec.class);
+ when(tableSpec.getId()).thenReturn("t1");
+ TableProvider tableProvider = createTableProvider(tableSpec);
+ Assert.assertNotNull(tableProvider.getTable());
+ }
+
+ @Test
+ public void testGenerateCommonStoreConfig() {
+ Map<String, String> generatedConfig = new HashMap<>();
+ generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+ generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+ TableSpec tableSpec = mock(TableSpec.class);
+ when(tableSpec.getId()).thenReturn("t1");
+
+ TableProvider tableProvider = createTableProvider(tableSpec);
+ Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
+ Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+ Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+ }
+
+ @Test
+ public void testChangelogDisabled() {
+ TableSpec tableSpec = createTableDescriptor("t1")
+ .getTableSpec();
+
+ TableProvider tableProvider = createTableProvider(tableSpec);
+ Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
+ Assert.assertEquals(2, tableConfig.size());
+ Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), "t1")));
+ }
+
+ @Test
+ public void testChangelogEnabled() {
+ TableSpec tableSpec = createTableDescriptor("t1")
+ .withChangelogEnabled()
+ .getTableSpec();
+
+ Map<String, String> jobConfig = new HashMap<>();
+ jobConfig.put(JobConfig.JOB_NAME(), "test-job");
+ jobConfig.put(JobConfig.JOB_ID(), "10");
+
+ TableProvider tableProvider = createTableProvider(tableSpec);
+ Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(jobConfig), new MapConfig());
+ Assert.assertEquals(3, tableConfig.size());
+ Assert.assertEquals("test-job-10-table-t1", String.format(
+ tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
+ }
+
+ @Test
+ public void testChangelogEnabledWithCustomParameters() {
+ TableSpec tableSpec = createTableDescriptor("t1")
+ .withChangelogStream("my-stream")
+ .withChangelogReplicationFactor(100)
+ .getTableSpec();
+
+ TableProvider tableProvider = createTableProvider(tableSpec);
+ Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), new MapConfig());
+ Assert.assertEquals(4, tableConfig.size());
+ Assert.assertEquals("my-stream", String.format(
+ tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), "t1"))));
+ Assert.assertEquals("100", String.format(
+ tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), "t1"))));
+ }
+
+ private TableProvider createTableProvider(TableSpec tableSpec) {
+ return new BaseLocalStoreBackedTableProvider(tableSpec) {
+ @Override
+ public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+ return generateCommonStoreConfig(jobConfig, generatedConfig);
+ }
+ };
+ }
+
+ private BaseLocalStoreBackedTableDescriptor createTableDescriptor(String tableId) {
+ return new BaseLocalStoreBackedTableDescriptor(tableId) {
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+ return new TableSpec(tableId, serde, null, tableSpecConfig,
+ sideInputs, sideInputsProcessor);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index a1c1bdd..7faff17 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,7 +21,7 @@ package org.apache.samza.sql.impl;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.sql.data.SamzaSqlCompositeKey;
@@ -29,7 +29,7 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 3a73e09..8636736 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.SystemStream;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index fd1a2a8..1caefe6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelRoot;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.translator.QueryTranslator;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
index 435a2cc..94b2296 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -25,14 +25,14 @@ import java.util.Optional;
import org.apache.calcite.rel.core.TableModify;
import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 817f145..c9365cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -31,14 +31,11 @@ import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
@@ -50,7 +47,10 @@ import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index be94160..cc765bd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -23,12 +23,12 @@ import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.core.TableScan;
import org.apache.commons.lang.Validate;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index a7ab663..98cc92e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -32,9 +32,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 4c78b5a..14314c8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -26,8 +26,8 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.NotImplementedException;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -36,13 +36,13 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.BaseTableProvider;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
public class TestIOResolverFactory implements SqlIOResolverFactory {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index 07ebe33..b9b0c96 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import org.apache.calcite.DataContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index f0a8a89..dcd7023 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -33,10 +33,10 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
@@ -49,7 +49,7 @@ import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 2ed7a00..68db1e4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -31,7 +31,7 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.calcite.util.Pair;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 7a194db..cd81e0d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
index a74993f..4943504 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
@@ -22,14 +22,13 @@ package org.apache.samza.sql.translator;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.TableImpl;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.storage.kv.RocksDbTableProvider;
-import org.apache.samza.table.Table;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableProvider;
import org.apache.samza.table.TableSpec;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
index 7d5e0d2..ba9c8b3 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -21,7 +21,7 @@ package org.apache.samza.example;
import java.time.Duration;
import java.util.HashMap;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.triggers.Triggers;
@@ -33,9 +33,9 @@ import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
index 4ef2402..7721d44 100644
--- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -20,7 +20,7 @@
package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -29,9 +29,9 @@ import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index e991c4e..4923b7d 100644
--- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
@@ -36,9 +36,9 @@ import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
index fe018f3..ac0db36 100644
--- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -22,7 +22,7 @@ package org.apache.samza.example;
import com.google.common.collect.ImmutableList;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.runtime.ApplicationRunner;
@@ -31,9 +31,9 @@ import org.apache.samza.operators.KV;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
public class MergeExample implements StreamApplication {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 8d3812b..ea38984 100644
--- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,7 +20,7 @@ package org.apache.samza.example;
import java.time.Duration;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.functions.JoinFunction;
@@ -29,9 +29,9 @@ import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index e2ebc93..1476c81 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -19,7 +19,7 @@
package org.apache.samza.example;
import java.time.Duration;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
@@ -36,9 +36,9 @@ import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
index 8a0ca28..2cf3ac3 100644
--- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -20,7 +20,7 @@ package org.apache.samza.example;
import java.time.Duration;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -32,9 +32,9 @@ import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
index 73dc10a..8f6c6f8 100644
--- a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
@@ -18,18 +18,18 @@
*/
package org.apache.samza.example;
-import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
index 2f4c19c..51089f7 100644
--- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -21,7 +21,7 @@ package org.apache.samza.example;
import java.time.Duration;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -34,9 +34,9 @@ import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.util.CommandLine;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index 42379f3..b47bf0a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -24,7 +24,7 @@ import java.time.Duration;
import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -32,8 +32,8 @@ import static org.junit.Assert.assertThat;
/**
- * Assertion utils on the content of a stream described by
- * {@link org.apache.samza.operators.descriptors.base.stream.StreamDescriptor}.
+ * Assertion utils on the content of a stream described by a
+ * {@link org.apache.samza.system.descriptors.StreamDescriptor}
*/
public class StreamAssert {
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 5cd47de..ba7128a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -58,9 +58,9 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.task.AsyncStreamTask;
import org.apache.samza.task.StreamTask;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
deleted file mode 100644
index 6065bf0..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.serializers.NoOpSerde;
-
-/**
- * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
- * <p>
- * @param <StreamMessageType> type of messages in input stream
- */
-public class InMemoryInputDescriptor<StreamMessageType>
- extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> {
- /**
- * Constructs a new InMemoryInputDescriptor from specified components.
- * @param systemDescriptor name of the system stream is associated with
- * @param streamId name of the stream
- */
- InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) {
- super(streamId, new NoOpSerde<>(), systemDescriptor, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
deleted file mode 100644
index 75fe7ae..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.NoOpSerde;
-
-/**
- * A descriptor for an in memory output stream.
- * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
- * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream.
- */
-public class InMemoryOutputDescriptor<StreamMessageType>
- extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> {
-
- /**
- * Constructs an {@link OutputDescriptor} instance.
- * @param streamId id of the stream
- * @param systemDescriptor system descriptor this stream descriptor was obtained from
- */
- InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
- super(streamId, new NoOpSerde<>(), systemDescriptor);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
deleted file mode 100644
index 77948f6..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.InMemorySystemConfig;
-import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.inmemory.InMemorySystemFactory;
-import org.apache.samza.config.JavaSystemConfig;
-
-/**
- * A descriptor for InMemorySystem.
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- * <p>
- * Following system level configs are set by default
- * <ol>
- * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- * <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li>
- * </ol>
- */
-public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
- implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
- private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
- /**
- * <p>
- * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
- * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
- * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
- * scope have the highest precedence.
- *
- * For this case, it generates following overridden configs
- * <ol>
- * <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- * <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- * </ol>
- *
- **/
- private String inMemoryScope;
-
- /**
- * Constructs a new InMemorySystemDescriptor from specified components.
- * <p>
- * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and
- * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
- * <p>
- * @param systemName unique name of the system
- */
- public InMemorySystemDescriptor(String systemName) {
- super(systemName, FACTORY_CLASS_NAME, null, null);
- this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
- }
-
- @Override
- public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor(
- String streamId, Serde<StreamMessageType> serde) {
- return new InMemoryInputDescriptor<StreamMessageType>(streamId, this);
- }
-
- @Override
- public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor(
- String streamId, Serde<StreamMessageType> serde) {
- return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this);
- }
-
- /**
- * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses
- * This method is framework use only, users are not supposed to use it
- *
- * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem
- * @return this system descriptor
- */
- public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) {
- this.inMemoryScope = inMemoryScope;
- return this;
- }
-
- @Override
- public Map<String, String> toConfig() {
- HashMap<String, String> configs = new HashMap<>(super.toConfig());
- configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
- configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
- return configs;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
new file mode 100644
index 0000000..0e49550
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system.descriptors;
+
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * @param <StreamMessageType> type of messages in input stream
+ */
+public class InMemoryInputDescriptor<StreamMessageType>
+ extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> {
+ /**
+ * Constructs a new InMemoryInputDescriptor from specified components.
+ * @param systemDescriptor name of the system stream is associated with
+ * @param streamId name of the stream
+ */
+ InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) {
+ super(streamId, new NoOpSerde<>(), systemDescriptor, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java
new file mode 100644
index 0000000..26c64f3
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryOutputDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system.descriptors;
+
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public class InMemoryOutputDescriptor<StreamMessageType>
+ extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> {
+
+ /**
+ * Constructs an {@link OutputDescriptor} instance.
+ * @param streamId id of the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+ super(streamId, new NoOpSerde<>(), systemDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
new file mode 100644
index 0000000..96a8aca
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.system.descriptors.OutputDescriptorProvider;
+import org.apache.samza.system.descriptors.SimpleInputDescriptorProvider;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.config.JavaSystemConfig;
+
+/**
+ * A descriptor for InMemorySystem.
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * <p>
+ * Following system level configs are set by default
+ * <ol>
+ * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ * <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li>
+ * </ol>
+ */
+public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
+ implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+ private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
+ /**
+ * <p>
+ * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
+ * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
+ * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
+ * scope have the highest precedence.
+ *
+ * For this case, it generates following overridden configs
+ * <ol>
+ * <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ * <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ * </ol>
+ *
+ **/
+ private String inMemoryScope;
+
+ /**
+ * Constructs a new InMemorySystemDescriptor from specified components.
+ * <p>
+ * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and
+ * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
+ * <p>
+ * @param systemName unique name of the system
+ */
+ public InMemorySystemDescriptor(String systemName) {
+ super(systemName, FACTORY_CLASS_NAME, null, null);
+ this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
+ }
+
+ @Override
+ public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor(
+ String streamId, Serde<StreamMessageType> serde) {
+ return new InMemoryInputDescriptor<StreamMessageType>(streamId, this);
+ }
+
+ @Override
+ public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor(
+ String streamId, Serde<StreamMessageType> serde) {
+ return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this);
+ }
+
+ /**
+ * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses
+ * This method is framework use only, users are not supposed to use it
+ *
+ * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem
+ * @return this system descriptor
+ */
+ public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) {
+ this.inMemoryScope = inMemoryScope;
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ HashMap<String, String> configs = new HashMap<>(super.toConfig());
+ configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
+ configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME);
+ return configs;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index 1954cc3..2e51f6a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -19,13 +19,13 @@
package org.apache.samza.test.integration;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 4c8884d..672837b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -24,16 +24,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.KV;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.KVSerde;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index e0097bd..ba62691 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -40,10 +40,10 @@ import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.operators.KV;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.impl.InputOperatorImpl;
import org.apache.samza.operators.impl.OperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index 7696b62..e326f91 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -29,9 +29,9 @@ import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
index 4caf266..ef17a22 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
@@ -21,12 +21,12 @@ package org.apache.samza.test.framework;
import java.util.Arrays;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.test.operator.data.PageView;
public class BroadcastAssertApp implements StreamApplication {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
index 7deb4d7..649c032 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
@@ -31,8 +31,8 @@ import org.apache.samza.config.ZkConfig;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.task.ClosableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 6188381..a442140 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Random;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -33,22 +33,23 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.Table;
import org.apache.samza.test.controlmessages.TestData;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.table.PageViewToProfileJoinFunction;
import org.apache.samza.test.table.TestTableData;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.samza.test.controlmessages.TestData.*;
+
+import static org.apache.samza.test.controlmessages.TestData.PageView;
public class StreamApplicationIntegrationTest {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 003b200..aa9e107 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -28,35 +28,36 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.TaskApplicationDescriptor;
import org.apache.samza.context.Context;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.table.TestTableData;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.PageView;
+
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
public class StreamTaskIntegrationTest {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
index 1644a0f..20f18ee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
@@ -25,14 +25,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.test.operator.data.PageView;
public class TestSchedulingApp implements StreamApplication {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index c63c11f..dda31ea 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -23,7 +23,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -35,8 +35,8 @@ import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.operator.data.AdClick;
import org.apache.samza.test.operator.data.PageView;