You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:09 UTC

[3/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 6df3421..5aaad26 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -283,7 +283,7 @@ class JoinTranslator {
     // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
     // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
     Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
-        context.getStreamGraph().getTable(sourceConfig.getTableDescriptor().get());
+        context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get());
 
     relOutputStream
         .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index c422130..fe4d8da 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
 
 import java.util.Map;
 import java.util.Optional;
-
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelShuttleImpl;
@@ -30,37 +29,37 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
-import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * This class is used to populate the StreamGraph using the SQL queries.
+ * This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
  * This class contains the core of the SamzaSQL control code that converts the SQL statements to calcite relational graph.
- * It then walks the relational graph and then populates the Samza's {@link StreamGraph} accordingly.
+ * It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly.
  */
 public class QueryTranslator {
   private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
@@ -96,13 +95,13 @@ public class QueryTranslator {
     this.converters = sqlConfig.getSamzaRelConverters();
   }
 
-  public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) {
+  public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
     QueryPlanner planner =
         new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
             sqlConfig.getUdfMetadata());
     final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
     final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
-    final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext, this.converters);
+    final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
     final RelNode node = relRoot.project();
     final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
 
@@ -159,9 +158,9 @@ public class QueryTranslator {
       String systemName = sinkConfig.getSystemName();
       DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
       GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
-      outputStream.sendTo(streamGraph.getOutputStream(osd));
+      outputStream.sendTo(appDesc.getOutputStream(osd));
     } else {
-      Table outputTable = streamGraph.getTable(tableDescriptor.get());
+      Table outputTable = appDesc.getTable(tableDescriptor.get());
       if (outputTable == null) {
         String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
         LOG.error(msg);
@@ -170,7 +169,7 @@ public class QueryTranslator {
       outputStream.sendTo(outputTable);
     }
 
-    streamGraph.withContextManager(new ContextManager() {
+    appDesc.withContextManager(new ContextManager() {
       @Override
       public void init(Config config, TaskContext taskContext) {
         taskContext.setUserContext(context.clone());

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 46e0840..2dc28be 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -23,10 +23,10 @@ import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
@@ -34,8 +34,8 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -73,7 +73,7 @@ class ScanTranslator {
   }
 
   void translate(final TableScan tableScan, final TranslatorContext context) {
-    StreamGraph streamGraph = context.getStreamGraph();
+    StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
     String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
 
@@ -85,7 +85,7 @@ class ScanTranslator {
     KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
     DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
     GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde);
-    MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(isd);
+    MessageStream<KV<Object, Object>> inputStream = streamAppDesc.getInputStream(isd);
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName));
 
     context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index e622d55..a7ab663 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import java.util.TimeZone;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -33,8 +32,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
@@ -42,13 +41,13 @@ import org.apache.samza.sql.interfaces.SamzaRelConverter;
 
 
 /**
- * State that is maintained while translating the Calcite relational graph to Samza {@link StreamGraph}.
+ * State that is maintained while translating the Calcite relational graph to Samza {@link StreamApplicationDescriptor}.
  */
 public class TranslatorContext implements Cloneable {
   /**
    * The internal variables that are shared among all cloned {@link TranslatorContext}
    */
-  private final StreamGraph streamGraph;
+  private final StreamApplicationDescriptor streamAppDesc;
   private final RexToJavaCompiler compiler;
   private final Map<String, SamzaRelConverter> relSamzaConverters;
   private final Map<Integer, MessageStream> messageStreams;
@@ -122,7 +121,7 @@ public class TranslatorContext implements Cloneable {
    * @param other the original object to copy from
    */
   private TranslatorContext(TranslatorContext other) {
-    this.streamGraph  = other.streamGraph;
+    this.streamAppDesc  = other.streamAppDesc;
     this.compiler = other.compiler;
     this.relSamzaConverters = other.relSamzaConverters;
     this.messageStreams = other.messageStreams;
@@ -134,13 +133,13 @@ public class TranslatorContext implements Cloneable {
 
   /**
    * Create the instance of TranslatorContext
-   * @param streamGraph Samza's streamGraph that is populated during the translation.
+   * @param stramAppDesc Samza's streamAppDesc that is populated during the translation.
    * @param relRoot Root of the relational graph from calcite.
    * @param executionContext the execution context
    * @param converters the map of schema to RelData converters
    */
-  TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) {
-    this.streamGraph = streamGraph;
+  TranslatorContext(StreamApplicationDescriptor stramAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) {
+    this.streamAppDesc = stramAppDesc;
     this.compiler = createExpressionCompiler(relRoot);
     this.executionContext = executionContext;
     this.dataContext = new DataContextImpl();
@@ -155,8 +154,8 @@ public class TranslatorContext implements Cloneable {
    *
    * @return the stream graph
    */
-  public StreamGraph getStreamGraph() {
-    return streamGraph;
+  public StreamApplicationDescriptor getStreamAppDescriptor() {
+    return streamAppDesc;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
index c4cacbd..cc339f1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -45,8 +45,8 @@ public class TestSamzaSqlTable {
     String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
 
     Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
   }
@@ -61,8 +61,8 @@ public class TestSamzaSqlTable {
     String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
-    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-    runner.runAndWaitForFinish();
+    SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
 
     Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index b6dcac5..9fab5d5 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -25,11 +25,9 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
 
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.junit.Test;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index 88ce443..e7c2195 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -25,12 +25,12 @@ import java.util.HashSet;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
@@ -73,7 +73,7 @@ public class TestFilterTranslator extends TranslatorTestBase {
     when(mockFilter.getInput()).thenReturn(mockInput);
     when(mockInput.getId()).thenReturn(1);
     when(mockFilter.getId()).thenReturn(2);
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
     when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
@@ -95,7 +95,7 @@ public class TestFilterTranslator extends TranslatorTestBase {
     assertNotNull(filterSpec);
     assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
 
-    // Verify that the init() method will establish the context for the filter function
+    // Verify that the describe() method will establish the context for the filter function
     Config mockConfig = mock(Config.class);
     TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
         new HashSet<>(), null, null, null, null, null, null);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 7395a3d..f0a8a89 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -33,9 +33,9 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -132,22 +132,22 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(mockRightInput.getRowType()).thenReturn(mockRightRowType);
     when(mockRightRowType.getFieldNames()).thenReturn(rightStreamFieldNames);
 
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class);
-    MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockGraph, mockLeftInputOp);
+    MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockAppDesc, mockLeftInputOp);
     when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream);
     OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class);
-    MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockGraph, mockRightInputOp);
+    MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockAppDesc, mockRightInputOp);
     when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream);
-    when(mockContext.getStreamGraph()).thenReturn(mockGraph);
+    when(mockContext.getStreamAppDescriptor()).thenReturn(mockAppDesc);
 
     InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class);
     OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class);
     when(mockInputOp.isKeyed()).thenReturn(true);
     when(mockOutputStream.isKeyed()).thenReturn(true);
     IntermediateMessageStreamImpl
-        mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream);
-    when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream);
+        mockPartitionedStream = new IntermediateMessageStreamImpl(mockAppDesc, mockInputOp, mockOutputStream);
+    when(mockAppDesc.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream);
 
     doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -155,7 +155,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     Expression mockExpr = mock(Expression.class);
     when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
 
-    doAnswer(this.getRegisteredTableAnswer()).when(mockGraph).getTable(any(RocksDbTableDescriptor.class));
+    doAnswer(this.getRegisteredTableAnswer()).when(mockAppDesc).getTable(any(RocksDbTableDescriptor.class));
     when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER);
     SqlIOResolver mockResolver = mock(SqlIOResolver.class);
     SqlIOConfig mockIOConfig = mock(SqlIOConfig.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index f84dd3f..1acfc47 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -32,12 +32,12 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.calcite.util.Pair;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
@@ -91,9 +91,9 @@ public class TestProjectTranslator extends TranslatorTestBase {
     List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
     namedProjects.add(Pair.of(mockRexField, "test_field"));
     when(mockProject.getNamedProjects()).thenReturn(namedProjects);
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
-    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
     when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
     doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -113,7 +113,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertNotNull(projectSpec);
     assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
 
-    // Verify that the init() method will establish the context for the map function
+    // Verify that the bootstrap() method will establish the context for the map function
     Config mockConfig = mock(Config.class);
     TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
         new HashSet<>(), null, null, null, null, null, null);
@@ -183,7 +183,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
     flattenProjects.add(mockFlattenProject);
     when(mockProject.getProjects()).thenReturn(flattenProjects);
 
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new OperatorSpec(OperatorSpec.OpCode.INPUT, "1") {
 
       @Override
@@ -197,7 +197,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
       }
     };
 
-    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
     when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
     doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -248,7 +248,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertNotNull(projectSpec);
     assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
 
-    // Verify that the init() method will establish the context for the map function
+    // Verify that the describe() method will establish the context for the map function
     Config mockConfig = mock(Config.class);
     TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
         new HashSet<>(), null, null, null, null, null, null);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 1776067..c9f59e6 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -19,20 +19,20 @@
 
 package org.apache.samza.sql.translator;
 
-import java.util.HashSet;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
@@ -49,8 +49,7 @@ public class TestQueryTranslator {
   private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) {
     Assert.assertNotEquals(originContext, clonedContext);
     Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
-    Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph());
-    Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
+    Assert.assertTrue(originContext.getStreamAppDescriptor() == clonedContext.getStreamAppDescriptor());
     Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters"));
     Assert.assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == Whitebox.getInternalState(clonedContext, "messageStreams"));
     Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes"));
@@ -85,10 +84,10 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(descriptor -> { },samzaConfig);
+
+    translator.translate(queryInfo, appDesc);
+    OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
     String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -97,29 +96,29 @@ public class TestQueryTranslator {
     String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
     String outputSystem = streamConfig.getSystem(outputStreamId);
     String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
-    
+
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
     Assert.assertEquals("testavro", outputSystem);
     Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    
+
     Assert.assertEquals("testavro", inputSystem);
     Assert.assertEquals("SIMPLE1", inputPhysicalName);
 
-    validatePerTaskContextInit(graphSpec, samzaConfig);
+    validatePerTaskContextInit(appDesc, samzaConfig);
   }
 
-  private void validatePerTaskContextInit(StreamGraphSpec graphSpec, Config samzaConfig) {
+  private void validatePerTaskContextInit(StreamApplicationDescriptorImpl appDesc, Config samzaConfig) {
     // make sure that each task context would have a separate instance of cloned TranslatorContext
     TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null,
         new HashSet<>(), null, null, null, null, null, null);
-    // call ContextManager.init() to instantiate the per-task TranslatorContext
-    graphSpec.getContextManager().init(samzaConfig, testContext);
+    // call ContextManager.bootstrap() to instantiate the per-task TranslatorContext
+    appDesc.getContextManager().init(samzaConfig, testContext);
     Assert.assertNotNull(testContext.getUserContext());
     Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
     TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext();
-    // call ContextManager.init() second time to instantiate another clone of TranslatorContext
-    graphSpec.getContextManager().init(samzaConfig, testContext);
+    // call ContextManager.bootstrap() second time to instantiate another clone of TranslatorContext
+    appDesc.getContextManager().init(samzaConfig, testContext);
     Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
     // validate the two copies of TranslatorContext are clones of each other
     validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext());
@@ -137,9 +136,10 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
     String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -156,7 +156,7 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", inputSystem);
     Assert.assertEquals("COMPLEX1", inputPhysicalName);
 
-    validatePerTaskContextInit(graphSpec, samzaConfig);
+    validatePerTaskContextInit(streamAppDesc, samzaConfig);
   }
 
   @Test
@@ -168,10 +168,10 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
     String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -188,7 +188,7 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", inputSystem);
     Assert.assertEquals("COMPLEX1", inputPhysicalName);
 
-    validatePerTaskContextInit(graphSpec, samzaConfig);
+    validatePerTaskContextInit(streamAppDesc, samzaConfig);
   }
 
   @Test (expected = SamzaException.class)
@@ -204,9 +204,9 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -223,9 +223,9 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = IllegalStateException.class)
@@ -242,9 +242,9 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -261,9 +261,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -278,9 +277,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -297,9 +295,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -317,9 +314,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -336,9 +332,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -355,9 +350,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -374,9 +368,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -393,9 +386,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -416,9 +408,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -435,9 +426,8 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 
   @Test
@@ -454,10 +444,10 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
     String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -490,7 +480,7 @@ public class TestQueryTranslator {
     Assert.assertEquals("kafka", input3System);
     Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
-    validatePerTaskContextInit(graphSpec, samzaConfig);
+    validatePerTaskContextInit(streamAppDesc, samzaConfig);
   }
 
   @Test
@@ -507,11 +497,11 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
 
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    translator.translate(queryInfo, streamAppDesc);
+
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
     String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -544,7 +534,7 @@ public class TestQueryTranslator {
     Assert.assertEquals("kafka", input3System);
     Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
-    validatePerTaskContextInit(graphSpec, samzaConfig);
+    validatePerTaskContextInit(streamAppDesc, samzaConfig);
   }
 
   @Test
@@ -561,11 +551,10 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
 
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
     String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -598,7 +587,7 @@ public class TestQueryTranslator {
     Assert.assertEquals("kafka", input3System);
     Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
-    validatePerTaskContextInit(graphSpec, samzaConfig);
+    validatePerTaskContextInit(streamAppDesc, samzaConfig);
   }
 
   @Test
@@ -615,10 +604,10 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+    translator.translate(queryInfo, streamAppDesc);
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     Assert.assertEquals(1, specGraph.getInputOperators().size());
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
@@ -639,8 +628,7 @@ public class TestQueryTranslator {
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamGraphSpec
-        graphSpec = new StreamGraphSpec(samzaConfig);
-    translator.translate(queryInfo, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    translator.translate(queryInfo, streamAppDesc);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
index d7f805d..7d5e0d2 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -19,15 +19,17 @@
 package org.apache.samza.example;
 
 import java.time.Duration;
+import java.util.HashMap;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -46,14 +48,14 @@ public class AppWithGlobalConfigExample implements StreamApplication {
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
-    AppWithGlobalConfigExample app = new AppWithGlobalConfigExample();
-    runner.run(app);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new AppWithGlobalConfigExample(), config);
+
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -63,12 +65,15 @@ public class AppWithGlobalConfigExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
 
-    graph.getInputStream(inputStreamDescriptor)
-        .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null)
+    appDesc.getInputStream(inputStreamDescriptor)
+        .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1,
+            null, null)
             .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
-            .setAccumulationMode(AccumulationMode.DISCARDING), "w1")
+            .setAccumulationMode(AccumulationMode.DISCARDING), "window1")
         .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
-        .sendTo(graph.getOutputStream(outputStreamDescriptor));
+        .sendTo(appDesc.getOutputStream(outputStreamDescriptor));
+
+    appDesc.withMetricsReporterFactories(new HashMap<>());
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
index 1c1b4be..4ef2402 100644
--- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -20,11 +20,12 @@
 package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -43,16 +44,13 @@ public class BroadcastExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-
-    StreamApplication app = new BroadcastExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
-    runner.run(app);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new BroadcastExample(), config);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
     KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
@@ -64,10 +62,10 @@ public class BroadcastExample implements StreamApplication {
     KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
         trackingSystem.getOutputDescriptor("outStream3", serde);
 
-    MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream(pageViewEvent);
-    inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream(outStream1));
-    inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream(outStream2));
-    inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream(outStream3));
+    MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageViewEvent);
+    inputStream.filter(m -> m.key.equals("key1")).sendTo(appDesc.getOutputStream(outStream1));
+    inputStream.filter(m -> m.key.equals("key2")).sendTo(appDesc.getOutputStream(outStream2));
+    inputStream.filter(m -> m.key.equals("key3")).sendTo(appDesc.getOutputStream(outStream3));
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index 4d3307b..dfc4b42 100644
--- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -18,19 +18,19 @@
  */
 package org.apache.samza.example;
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -51,15 +51,14 @@ public class KeyValueStoreExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    KeyValueStoreExample app = new KeyValueStoreExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new KeyValueStoreExample(), config);
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -69,9 +68,9 @@ public class KeyValueStoreExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
 
-    graph.setDefaultSystem(trackingSystem);
-    MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor);
-    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor);
+    appDesc.withDefaultSystem(trackingSystem);
+    MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
 
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
index 33d60d6..fe018f3 100644
--- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -22,11 +22,12 @@ package org.apache.samza.example;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -41,15 +42,14 @@ public class MergeExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    MergeExample app = new MergeExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new MergeExample(), config);
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
@@ -64,8 +64,8 @@ public class MergeExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("mergedStream", serde);
 
     MessageStream
-        .mergeAll(ImmutableList.of(graph.getInputStream(isd1), graph.getInputStream(isd2), graph.getInputStream(isd3)))
-        .sendTo(graph.getOutputStream(osd));
+        .mergeAll(ImmutableList.of(appDesc.getInputStream(isd1), appDesc.getInputStream(isd2), appDesc.getInputStream(isd3)))
+        .sendTo(appDesc.getOutputStream(osd));
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 34b5fc6..8d3812b 100644
--- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,11 +20,12 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -43,15 +44,13 @@ public class OrderShipmentJoinExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    OrderShipmentJoinExample app = new OrderShipmentJoinExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
-    runner.run(app);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new OrderShipmentJoinExample(), config);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
@@ -62,12 +61,12 @@ public class OrderShipmentJoinExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("fulfilledOrders",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
 
-    graph.getInputStream(orderStreamDescriptor)
-        .join(graph.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
+    appDesc.getInputStream(orderStreamDescriptor)
+        .join(appDesc.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
             new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
             Duration.ofMinutes(1), "join")
         .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
-        .sendTo(graph.getOutputStream(fulfilledOrdersStreamDescriptor));
+        .sendTo(appDesc.getOutputStream(fulfilledOrdersStreamDescriptor));
 
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index dc5eb74..b540585 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -19,19 +19,20 @@
 package org.apache.samza.example;
 
 import java.time.Duration;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -51,14 +52,14 @@ public class PageViewCounterExample implements StreamApplication {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     PageViewCounterExample app = new PageViewCounterExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -68,8 +69,8 @@ public class PageViewCounterExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
 
-    MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor);
-    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = graph.getOutputStream(outputStreamDescriptor);
+    MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDesc.getOutputStream(outputStreamDescriptor);
 
     SupplierFunction<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
index b776c7d..8a0ca28 100644
--- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -20,14 +20,15 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -46,15 +47,14 @@ public class RepartitionExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    RepartitionExample app = new RepartitionExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new RepartitionExample(), config);
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -64,9 +64,9 @@ public class RepartitionExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
 
-    graph.setDefaultSystem(trackingSystem);
-    MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor);
-    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor);
+    appDesc.withDefaultSystem(trackingSystem);
+    MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
 
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
@@ -75,7 +75,6 @@ public class RepartitionExample implements StreamApplication {
             KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), "window")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
         .sendTo(pageViewEventPerMember);
-
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
new file mode 100644
index 0000000..73dc10a
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Test example of a low-level API application (i.e. {@link TaskApplication})
+ */
+public class TaskApplicationExample implements TaskApplication {
+
+  public class MyStreamTask implements StreamTask {
+
+    @Override
+    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
+        throws Exception {
+      // processing logic here
+    }
+  }
+
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TaskApplicationExample(), config);
+    runner.run();
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void describe(TaskApplicationDescriptor appDesc) {
+    // add input and output streams
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("tracking");
+    KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
+    KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
+    TableDescriptor td = new RocksDbTableDescriptor("mytable");
+
+    appDesc.addInputStream(isd);
+    appDesc.addOutputStream(osd);
+    appDesc.addTable(td);
+    // create the task factory based on configuration
+    appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
index cbc1e8e..2f4c19c 100644
--- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -21,16 +21,17 @@ package org.apache.samza.example;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
@@ -49,15 +50,14 @@ public class WindowExample implements StreamApplication {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    WindowExample app = new WindowExample();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new WindowExample(), config);
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -65,11 +65,10 @@ public class WindowExample implements StreamApplication {
     KafkaOutputDescriptor<Integer> outputStreamDescriptor =
         trackingSystem.getOutputDescriptor("pageViewEventPerMember", new IntegerSerde());
 
-    MessageStream<PageViewEvent> inputStream = graph.getInputStream(inputStreamDescriptor);
-    OutputStream<Integer> outputStream = graph.getOutputStream(outputStreamDescriptor);
-
     SupplierFunction<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+    MessageStream<PageViewEvent> inputStream = appDesc.getInputStream(inputStreamDescriptor);
+    OutputStream<Integer> outputStream = appDesc.getOutputStream(outputStreamDescriptor);
 
     // create a tumbling window that outputs the number of message collected every 10 minutes.
     // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
@@ -80,7 +79,6 @@ public class WindowExample implements StreamApplication {
                 Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
         .map(WindowPane::getMessage)
         .sendTo(outputStream);
-
   }
 
   class PageViewEvent {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
index a282dbb..2e27a4c 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
@@ -34,7 +34,7 @@ import org.apache.samza.util.Clock;
  * MockSystemConsumer is a class that simulates a multi-threaded consumer that
  * uses BlockingEnvelopeMap. The primary use for this class is to do performance
  * testing.
- * 
+ *
  * This class works by starting up (threadCount) threads. Each thread adds
  * (messagesPerBatch) to the BlockingEnvelopeMap, then sleeps for
  * (brokerSleepMs). The sleep is important to simulate network latency when
@@ -57,7 +57,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap {
   private List<Thread> threads;
 
   /**
-   * 
+   *
    * @param messagesPerBatch
    *          The number of messages to add to the BlockingEnvelopeMap before
    *          sleeping.

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 5ca497a..477d5b8 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -31,7 +31,9 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.InMemorySystemConfig;
 import org.apache.samza.config.JobConfig;
@@ -56,7 +58,10 @@ import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskFactory;
 import org.apache.samza.test.framework.stream.CollectionStream;
 import org.apache.samza.test.framework.system.CollectionStreamSystemSpec;
 import org.junit.Assert;
@@ -284,17 +289,13 @@ public class TestRunner {
   public void run(Duration timeout) {
     Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null),
         "TestRunner should run for Low Level Task api or High Level Application Api");
-    Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(),
-        "Timeouts should be positive");
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
-    if (app == null) {
-      runner.runTask();
-    } else {
-      runner.run(app);
-    }
+    Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive");
+    SamzaApplication testApp = app == null ? (TaskApplication) appDesc -> appDesc.setTaskFactory(createTaskFactory()) : app;
+    final LocalApplicationRunner runner = new LocalApplicationRunner(testApp, new MapConfig(configs));
+    runner.run();
     boolean timedOut = !runner.waitForFinish(timeout);
     Assert.assertFalse("Timed out waiting for application to finish", timedOut);
-    ApplicationStatus status = runner.status(app);
+    ApplicationStatus status = runner.status();
     if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
       throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
     }
@@ -364,4 +365,26 @@ public class TestRunner {
         .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
             entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList())));
   }
+
+  private TaskFactory createTaskFactory() {
+    if (StreamTask.class.isAssignableFrom(taskClass)) {
+      return (StreamTaskFactory) () -> {
+        try {
+          return (StreamTask) taskClass.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+          throw new SamzaException(String.format("Failed to instantiate StreamTask class %s", taskClass.getName()), e);
+        }
+      };
+    } else if (AsyncStreamTask.class.isAssignableFrom(taskClass)) {
+      return (AsyncStreamTaskFactory) () -> {
+        try {
+          return (AsyncStreamTask) taskClass.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+          throw new SamzaException(String.format("Failed to instantiate AsyncStreamTask class %s", taskClass.getName()), e);
+        }
+      };
+    }
+    throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask "
+        + "or AsyncStreamTask", taskClass.getName()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
index e8be592..34b264f 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
@@ -20,19 +20,18 @@
 package org.apache.samza.test.integration;
 
 import joptsimple.OptionSet;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunnerMain;
-import org.apache.samza.runtime.ApplicationRunnerOperation;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG;
-
 /**
- * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn
+ * {@link ApplicationRunnerMain} was designed for deploying {@link SamzaApplication} in yarn
  * and doesn't work for in standalone.
  *
  * This runner class is built for standalone failure tests and not recommended for general use.
@@ -47,17 +46,15 @@ public class LocalApplicationRunnerMain {
     Config orgConfig = cmdLine.loadConfig(options);
     Config config = Util.rewriteConfig(orgConfig);
 
-    ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-    StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
-
-    ApplicationRunnerOperation op = cmdLine.getOperation(options);
+    SamzaApplication app = ApplicationUtil.fromConfig(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
 
     try {
       LOGGER.info("Launching stream application: {} to run.", app);
-      runner.run(app);
+      runner.run();
       runner.waitForFinish();
     } catch (Exception e) {
-      LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e);
+      LOGGER.error("Exception occurred when running application: {}.", app, e);
     }
   }
 }