You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:09 UTC
[3/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and
ApplicationRunner for high- and low-level APIs in YARN and standalone
environment
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 6df3421..5aaad26 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -283,7 +283,7 @@ class JoinTranslator {
// Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
// message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
- context.getStreamGraph().getTable(sourceConfig.getTableDescriptor().get());
+ context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get());
relOutputStream
.map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index c422130..fe4d8da 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
import java.util.Map;
import java.util.Optional;
-
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttleImpl;
@@ -30,37 +29,37 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
-import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class is used to populate the StreamGraph using the SQL queries.
+ * This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
* This class contains the core of the SamzaSQL control code that converts the SQL statements to calcite relational graph.
- * It then walks the relational graph and then populates the Samza's {@link StreamGraph} accordingly.
+ * It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly.
*/
public class QueryTranslator {
private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
@@ -96,13 +95,13 @@ public class QueryTranslator {
this.converters = sqlConfig.getSamzaRelConverters();
}
- public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) {
+ public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
- final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext, this.converters);
+ final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
final RelNode node = relRoot.project();
final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
@@ -159,9 +158,9 @@ public class QueryTranslator {
String systemName = sinkConfig.getSystemName();
DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
- outputStream.sendTo(streamGraph.getOutputStream(osd));
+ outputStream.sendTo(appDesc.getOutputStream(osd));
} else {
- Table outputTable = streamGraph.getTable(tableDescriptor.get());
+ Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
LOG.error(msg);
@@ -170,7 +169,7 @@ public class QueryTranslator {
outputStream.sendTo(outputTable);
}
- streamGraph.withContextManager(new ContextManager() {
+ appDesc.withContextManager(new ContextManager() {
@Override
public void init(Config config, TaskContext taskContext) {
taskContext.setUserContext(context.clone());
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 46e0840..2dc28be 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -23,10 +23,10 @@ import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.core.TableScan;
import org.apache.commons.lang.Validate;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
@@ -34,8 +34,8 @@ import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.task.TaskContext;
/**
@@ -73,7 +73,7 @@ class ScanTranslator {
}
void translate(final TableScan tableScan, final TranslatorContext context) {
- StreamGraph streamGraph = context.getStreamGraph();
+ StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -85,7 +85,7 @@ class ScanTranslator {
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde);
- MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(isd);
+ MessageStream<KV<Object, Object>> inputStream = streamAppDesc.getInputStream(isd);
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName));
context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index e622d55..a7ab663 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
import java.util.HashMap;
import java.util.Map;
-
import java.util.TimeZone;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -33,8 +32,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
@@ -42,13 +41,13 @@ import org.apache.samza.sql.interfaces.SamzaRelConverter;
/**
- * State that is maintained while translating the Calcite relational graph to Samza {@link StreamGraph}.
+ * State that is maintained while translating the Calcite relational graph to Samza {@link StreamApplicationDescriptor}.
*/
public class TranslatorContext implements Cloneable {
/**
* The internal variables that are shared among all cloned {@link TranslatorContext}
*/
- private final StreamGraph streamGraph;
+ private final StreamApplicationDescriptor streamAppDesc;
private final RexToJavaCompiler compiler;
private final Map<String, SamzaRelConverter> relSamzaConverters;
private final Map<Integer, MessageStream> messageStreams;
@@ -122,7 +121,7 @@ public class TranslatorContext implements Cloneable {
* @param other the original object to copy from
*/
private TranslatorContext(TranslatorContext other) {
- this.streamGraph = other.streamGraph;
+ this.streamAppDesc = other.streamAppDesc;
this.compiler = other.compiler;
this.relSamzaConverters = other.relSamzaConverters;
this.messageStreams = other.messageStreams;
@@ -134,13 +133,13 @@ public class TranslatorContext implements Cloneable {
/**
* Create the instance of TranslatorContext
- * @param streamGraph Samza's streamGraph that is populated during the translation.
+ * @param stramAppDesc Samza's streamAppDesc that is populated during the translation.
* @param relRoot Root of the relational graph from calcite.
* @param executionContext the execution context
* @param converters the map of schema to RelData converters
*/
- TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) {
- this.streamGraph = streamGraph;
+ TranslatorContext(StreamApplicationDescriptor stramAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) {
+ this.streamAppDesc = stramAppDesc;
this.compiler = createExpressionCompiler(relRoot);
this.executionContext = executionContext;
this.dataContext = new DataContextImpl();
@@ -155,8 +154,8 @@ public class TranslatorContext implements Cloneable {
*
* @return the stream graph
*/
- public StreamGraph getStreamGraph() {
- return streamGraph;
+ public StreamApplicationDescriptor getStreamAppDescriptor() {
+ return streamAppDesc;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
index c4cacbd..cc339f1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -45,8 +45,8 @@ public class TestSamzaSqlTable {
String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
- SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
- runner.runAndWaitForFinish();
+ SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ appRunnable.runAndWaitForFinish();
Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
}
@@ -61,8 +61,8 @@ public class TestSamzaSqlTable {
String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
- SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
- runner.runAndWaitForFinish();
+ SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ appRunnable.runAndWaitForFinish();
Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index b6dcac5..9fab5d5 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -25,11 +25,9 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
import org.junit.Assert;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index 88ce443..e7c2195 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -25,12 +25,12 @@ import java.util.HashSet;
import org.apache.calcite.DataContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
@@ -73,7 +73,7 @@ public class TestFilterTranslator extends TranslatorTestBase {
when(mockFilter.getInput()).thenReturn(mockInput);
when(mockInput.getId()).thenReturn(1);
when(mockFilter.getId()).thenReturn(2);
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
@@ -95,7 +95,7 @@ public class TestFilterTranslator extends TranslatorTestBase {
assertNotNull(filterSpec);
assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
- // Verify that the init() method will establish the context for the filter function
+ // Verify that the describe() method will establish the context for the filter function
Config mockConfig = mock(Config.class);
TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
new HashSet<>(), null, null, null, null, null, null);
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 7395a3d..f0a8a89 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -33,9 +33,9 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -132,22 +132,22 @@ public class TestJoinTranslator extends TranslatorTestBase {
when(mockRightInput.getRowType()).thenReturn(mockRightRowType);
when(mockRightRowType.getFieldNames()).thenReturn(rightStreamFieldNames);
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class);
- MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockGraph, mockLeftInputOp);
+ MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockAppDesc, mockLeftInputOp);
when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream);
OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class);
- MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockGraph, mockRightInputOp);
+ MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockAppDesc, mockRightInputOp);
when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream);
- when(mockContext.getStreamGraph()).thenReturn(mockGraph);
+ when(mockContext.getStreamAppDescriptor()).thenReturn(mockAppDesc);
InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class);
OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class);
when(mockInputOp.isKeyed()).thenReturn(true);
when(mockOutputStream.isKeyed()).thenReturn(true);
IntermediateMessageStreamImpl
- mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream);
- when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream);
+ mockPartitionedStream = new IntermediateMessageStreamImpl(mockAppDesc, mockInputOp, mockOutputStream);
+ when(mockAppDesc.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream);
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class));
RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -155,7 +155,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
Expression mockExpr = mock(Expression.class);
when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
- doAnswer(this.getRegisteredTableAnswer()).when(mockGraph).getTable(any(RocksDbTableDescriptor.class));
+ doAnswer(this.getRegisteredTableAnswer()).when(mockAppDesc).getTable(any(RocksDbTableDescriptor.class));
when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER);
SqlIOResolver mockResolver = mock(SqlIOResolver.class);
SqlIOConfig mockIOConfig = mock(SqlIOConfig.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index f84dd3f..1acfc47 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -32,12 +32,12 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.calcite.util.Pair;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.TimerFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
@@ -91,9 +91,9 @@ public class TestProjectTranslator extends TranslatorTestBase {
List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
namedProjects.add(Pair.of(mockRexField, "test_field"));
when(mockProject.getNamedProjects()).thenReturn(namedProjects);
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
- MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+ MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -113,7 +113,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
assertNotNull(projectSpec);
assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
- // Verify that the init() method will establish the context for the map function
+ // Verify that the bootstrap() method will establish the context for the map function
Config mockConfig = mock(Config.class);
TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
new HashSet<>(), null, null, null, null, null, null);
@@ -183,7 +183,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
flattenProjects.add(mockFlattenProject);
when(mockProject.getProjects()).thenReturn(flattenProjects);
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new OperatorSpec(OperatorSpec.OpCode.INPUT, "1") {
@Override
@@ -197,7 +197,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
}
};
- MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+ MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
@@ -248,7 +248,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
assertNotNull(projectSpec);
assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
- // Verify that the init() method will establish the context for the map function
+ // Verify that the describe() method will establish the context for the map function
Config mockConfig = mock(Config.class);
TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
new HashSet<>(), null, null, null, null, null, null);
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 1776067..c9f59e6 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -19,20 +19,20 @@
package org.apache.samza.sql.translator;
-import java.util.HashSet;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
@@ -49,8 +49,7 @@ public class TestQueryTranslator {
private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) {
Assert.assertNotEquals(originContext, clonedContext);
Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
- Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph());
- Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
+ Assert.assertTrue(originContext.getStreamAppDescriptor() == clonedContext.getStreamAppDescriptor());
Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters"));
Assert.assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == Whitebox.getInternalState(clonedContext, "messageStreams"));
Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes"));
@@ -85,10 +84,10 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(descriptor -> { },samzaConfig);
+
+ translator.translate(queryInfo, appDesc);
+ OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -97,29 +96,29 @@ public class TestQueryTranslator {
String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
String outputSystem = streamConfig.getSystem(outputStreamId);
String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
-
+
Assert.assertEquals(1, specGraph.getOutputStreams().size());
Assert.assertEquals("testavro", outputSystem);
Assert.assertEquals("outputTopic", outputPhysicalName);
Assert.assertEquals(1, specGraph.getInputOperators().size());
-
+
Assert.assertEquals("testavro", inputSystem);
Assert.assertEquals("SIMPLE1", inputPhysicalName);
- validatePerTaskContextInit(graphSpec, samzaConfig);
+ validatePerTaskContextInit(appDesc, samzaConfig);
}
- private void validatePerTaskContextInit(StreamGraphSpec graphSpec, Config samzaConfig) {
+ private void validatePerTaskContextInit(StreamApplicationDescriptorImpl appDesc, Config samzaConfig) {
// make sure that each task context would have a separate instance of cloned TranslatorContext
TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null,
new HashSet<>(), null, null, null, null, null, null);
- // call ContextManager.init() to instantiate the per-task TranslatorContext
- graphSpec.getContextManager().init(samzaConfig, testContext);
+ // call ContextManager.bootstrap() to instantiate the per-task TranslatorContext
+ appDesc.getContextManager().init(samzaConfig, testContext);
Assert.assertNotNull(testContext.getUserContext());
Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext();
- // call ContextManager.init() second time to instantiate another clone of TranslatorContext
- graphSpec.getContextManager().init(samzaConfig, testContext);
+ // call ContextManager.bootstrap() second time to instantiate another clone of TranslatorContext
+ appDesc.getContextManager().init(samzaConfig, testContext);
Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
// validate the two copies of TranslatorContext are clones of each other
validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext());
@@ -137,9 +136,10 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
+ OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -156,7 +156,7 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", inputSystem);
Assert.assertEquals("COMPLEX1", inputPhysicalName);
- validatePerTaskContextInit(graphSpec, samzaConfig);
+ validatePerTaskContextInit(streamAppDesc, samzaConfig);
}
@Test
@@ -168,10 +168,10 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
+ OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -188,7 +188,7 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", inputSystem);
Assert.assertEquals("COMPLEX1", inputPhysicalName);
- validatePerTaskContextInit(graphSpec, samzaConfig);
+ validatePerTaskContextInit(streamAppDesc, samzaConfig);
}
@Test (expected = SamzaException.class)
@@ -204,9 +204,9 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -223,9 +223,9 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = IllegalStateException.class)
@@ -242,9 +242,9 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -261,9 +261,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -278,9 +277,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -297,9 +295,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -317,9 +314,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -336,9 +332,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -355,9 +350,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -374,9 +368,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -393,9 +386,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -416,9 +408,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test (expected = SamzaException.class)
@@ -435,9 +426,8 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
@Test
@@ -454,10 +444,10 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
+ OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -490,7 +480,7 @@ public class TestQueryTranslator {
Assert.assertEquals("kafka", input3System);
Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
- validatePerTaskContextInit(graphSpec, samzaConfig);
+ validatePerTaskContextInit(streamAppDesc, samzaConfig);
}
@Test
@@ -507,11 +497,11 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ translator.translate(queryInfo, streamAppDesc);
+
+ OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -544,7 +534,7 @@ public class TestQueryTranslator {
Assert.assertEquals("kafka", input3System);
Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
- validatePerTaskContextInit(graphSpec, samzaConfig);
+ validatePerTaskContextInit(streamAppDesc, samzaConfig);
}
@Test
@@ -561,11 +551,10 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
StreamConfig streamConfig = new StreamConfig(samzaConfig);
String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
@@ -598,7 +587,7 @@ public class TestQueryTranslator {
Assert.assertEquals("kafka", input3System);
Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
- validatePerTaskContextInit(graphSpec, samzaConfig);
+ validatePerTaskContextInit(streamAppDesc, samzaConfig);
}
@Test
@@ -615,10 +604,10 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
- OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+
+ translator.translate(queryInfo, streamAppDesc);
+ OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
Assert.assertEquals(1, specGraph.getInputOperators().size());
Assert.assertEquals(1, specGraph.getOutputStreams().size());
@@ -639,8 +628,7 @@ public class TestQueryTranslator {
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
- StreamGraphSpec
- graphSpec = new StreamGraphSpec(samzaConfig);
- translator.translate(queryInfo, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+ translator.translate(queryInfo, streamAppDesc);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
index d7f805d..7d5e0d2 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -19,15 +19,17 @@
package org.apache.samza.example;
import java.time.Duration;
+import java.util.HashMap;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -46,14 +48,14 @@ public class AppWithGlobalConfigExample implements StreamApplication {
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
- AppWithGlobalConfigExample app = new AppWithGlobalConfigExample();
- runner.run(app);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new AppWithGlobalConfigExample(), config);
+
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -63,12 +65,15 @@ public class AppWithGlobalConfigExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
- graph.getInputStream(inputStreamDescriptor)
- .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null)
+ appDesc.getInputStream(inputStreamDescriptor)
+ .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1,
+ null, null)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
- .setAccumulationMode(AccumulationMode.DISCARDING), "w1")
+ .setAccumulationMode(AccumulationMode.DISCARDING), "window1")
.map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
- .sendTo(graph.getOutputStream(outputStreamDescriptor));
+ .sendTo(appDesc.getOutputStream(outputStreamDescriptor));
+
+ appDesc.withMetricsReporterFactories(new HashMap<>());
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
index 1c1b4be..4ef2402 100644
--- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -20,11 +20,12 @@
package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -43,16 +44,13 @@ public class BroadcastExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-
- StreamApplication app = new BroadcastExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
- runner.run(app);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new BroadcastExample(), config);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
@@ -64,10 +62,10 @@ public class BroadcastExample implements StreamApplication {
KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
trackingSystem.getOutputDescriptor("outStream3", serde);
- MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream(pageViewEvent);
- inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream(outStream1));
- inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream(outStream2));
- inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream(outStream3));
+ MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageViewEvent);
+ inputStream.filter(m -> m.key.equals("key1")).sendTo(appDesc.getOutputStream(outStream1));
+ inputStream.filter(m -> m.key.equals("key2")).sendTo(appDesc.getOutputStream(outStream2));
+ inputStream.filter(m -> m.key.equals("key3")).sendTo(appDesc.getOutputStream(outStream3));
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
index 4d3307b..dfc4b42 100644
--- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -18,19 +18,19 @@
*/
package org.apache.samza.example;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -51,15 +51,14 @@ public class KeyValueStoreExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- KeyValueStoreExample app = new KeyValueStoreExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new KeyValueStoreExample(), config);
- runner.run(app);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -69,9 +68,9 @@ public class KeyValueStoreExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
- graph.setDefaultSystem(trackingSystem);
- MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor);
- OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor);
+ appDesc.withDefaultSystem(trackingSystem);
+ MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
+ OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
pageViewEvents
.partitionBy(pve -> pve.memberId, pve -> pve,
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
index 33d60d6..fe018f3 100644
--- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -22,11 +22,12 @@ package org.apache.samza.example;
import com.google.common.collect.ImmutableList;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.operators.KV;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -41,15 +42,14 @@ public class MergeExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- MergeExample app = new MergeExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new MergeExample(), config);
- runner.run(app);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
@@ -64,8 +64,8 @@ public class MergeExample implements StreamApplication {
trackingSystem.getOutputDescriptor("mergedStream", serde);
MessageStream
- .mergeAll(ImmutableList.of(graph.getInputStream(isd1), graph.getInputStream(isd2), graph.getInputStream(isd3)))
- .sendTo(graph.getOutputStream(osd));
+ .mergeAll(ImmutableList.of(appDesc.getInputStream(isd1), appDesc.getInputStream(isd2), appDesc.getInputStream(isd3)))
+ .sendTo(appDesc.getOutputStream(osd));
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 34b5fc6..8d3812b 100644
--- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,11 +20,12 @@ package org.apache.samza.example;
import java.time.Duration;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -43,15 +44,13 @@ public class OrderShipmentJoinExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- OrderShipmentJoinExample app = new OrderShipmentJoinExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
- runner.run(app);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new OrderShipmentJoinExample(), config);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
@@ -62,12 +61,12 @@ public class OrderShipmentJoinExample implements StreamApplication {
trackingSystem.getOutputDescriptor("fulfilledOrders",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
- graph.getInputStream(orderStreamDescriptor)
- .join(graph.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
+ appDesc.getInputStream(orderStreamDescriptor)
+ .join(appDesc.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
Duration.ofMinutes(1), "join")
.map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
- .sendTo(graph.getOutputStream(fulfilledOrdersStreamDescriptor));
+ .sendTo(appDesc.getOutputStream(fulfilledOrdersStreamDescriptor));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index dc5eb74..b540585 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -19,19 +19,20 @@
package org.apache.samza.example;
import java.time.Duration;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -51,14 +52,14 @@ public class PageViewCounterExample implements StreamApplication {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
PageViewCounterExample app = new PageViewCounterExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
- runner.run(app);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -68,8 +69,8 @@ public class PageViewCounterExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
- MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor);
- OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = graph.getOutputStream(outputStreamDescriptor);
+ MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
+ OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDesc.getOutputStream(outputStreamDescriptor);
SupplierFunction<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
index b776c7d..8a0ca28 100644
--- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -20,14 +20,15 @@ package org.apache.samza.example;
import java.time.Duration;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
@@ -46,15 +47,14 @@ public class RepartitionExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- RepartitionExample app = new RepartitionExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new RepartitionExample(), config);
- runner.run(app);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -64,9 +64,9 @@ public class RepartitionExample implements StreamApplication {
trackingSystem.getOutputDescriptor("pageViewEventPerMember",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
- graph.setDefaultSystem(trackingSystem);
- MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor);
- OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor);
+ appDesc.withDefaultSystem(trackingSystem);
+ MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
+ OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
pageViewEvents
.partitionBy(pve -> pve.memberId, pve -> pve,
@@ -75,7 +75,6 @@ public class RepartitionExample implements StreamApplication {
KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), "window")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
.sendTo(pageViewEventPerMember);
-
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
new file mode 100644
index 0000000..73dc10a
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Test example of a low-level API application (i.e. {@link TaskApplication})
+ */
+public class TaskApplicationExample implements TaskApplication {
+
+ public class MyStreamTask implements StreamTask {
+
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ // processing logic here
+ }
+ }
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TaskApplicationExample(), config);
+ runner.run();
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void describe(TaskApplicationDescriptor appDesc) {
+ // add input and output streams
+ KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("tracking");
+ KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
+ KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
+ TableDescriptor td = new RocksDbTableDescriptor("mytable");
+
+ appDesc.addInputStream(isd);
+ appDesc.addOutputStream(osd);
+ appDesc.addTable(td);
+ // create the task factory based on configuration
+ appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
index cbc1e8e..2f4c19c 100644
--- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -21,16 +21,17 @@ package org.apache.samza.example;
import java.time.Duration;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.system.kafka.KafkaInputDescriptor;
@@ -49,15 +50,14 @@ public class WindowExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- WindowExample app = new WindowExample();
- LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new WindowExample(), config);
- runner.run(app);
+ runner.run();
runner.waitForFinish();
}
@Override
- public void init(StreamGraph graph, Config config) {
+ public void describe(StreamApplicationDescriptor appDesc) {
KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -65,11 +65,10 @@ public class WindowExample implements StreamApplication {
KafkaOutputDescriptor<Integer> outputStreamDescriptor =
trackingSystem.getOutputDescriptor("pageViewEventPerMember", new IntegerSerde());
- MessageStream<PageViewEvent> inputStream = graph.getInputStream(inputStreamDescriptor);
- OutputStream<Integer> outputStream = graph.getOutputStream(outputStreamDescriptor);
-
SupplierFunction<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+ MessageStream<PageViewEvent> inputStream = appDesc.getInputStream(inputStreamDescriptor);
+ OutputStream<Integer> outputStream = appDesc.getOutputStream(outputStreamDescriptor);
// create a tumbling window that outputs the number of message collected every 10 minutes.
// also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
@@ -80,7 +79,6 @@ public class WindowExample implements StreamApplication {
Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
.map(WindowPane::getMessage)
.sendTo(outputStream);
-
}
class PageViewEvent {
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
index a282dbb..2e27a4c 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
@@ -34,7 +34,7 @@ import org.apache.samza.util.Clock;
* MockSystemConsumer is a class that simulates a multi-threaded consumer that
* uses BlockingEnvelopeMap. The primary use for this class is to do performance
* testing.
- *
+ *
* This class works by starting up (threadCount) threads. Each thread adds
* (messagesPerBatch) to the BlockingEnvelopeMap, then sleeps for
* (brokerSleepMs). The sleep is important to simulate network latency when
@@ -57,7 +57,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap {
private List<Thread> threads;
/**
- *
+ *
* @param messagesPerBatch
* The number of messages to add to the BlockingEnvelopeMap before
* sleeping.
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 5ca497a..477d5b8 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -31,7 +31,9 @@ import java.util.stream.Collectors;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.InMemorySystemConfig;
import org.apache.samza.config.JobConfig;
@@ -56,7 +58,10 @@ import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskFactory;
import org.apache.samza.test.framework.stream.CollectionStream;
import org.apache.samza.test.framework.system.CollectionStreamSystemSpec;
import org.junit.Assert;
@@ -284,17 +289,13 @@ public class TestRunner {
public void run(Duration timeout) {
Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null),
"TestRunner should run for Low Level Task api or High Level Application Api");
- Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(),
- "Timeouts should be positive");
- final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
- if (app == null) {
- runner.runTask();
- } else {
- runner.run(app);
- }
+ Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive");
+ SamzaApplication testApp = app == null ? (TaskApplication) appDesc -> appDesc.setTaskFactory(createTaskFactory()) : app;
+ final LocalApplicationRunner runner = new LocalApplicationRunner(testApp, new MapConfig(configs));
+ runner.run();
boolean timedOut = !runner.waitForFinish(timeout);
Assert.assertFalse("Timed out waiting for application to finish", timedOut);
- ApplicationStatus status = runner.status(app);
+ ApplicationStatus status = runner.status();
if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
}
@@ -364,4 +365,26 @@ public class TestRunner {
.collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList())));
}
+
+ private TaskFactory createTaskFactory() {
+ if (StreamTask.class.isAssignableFrom(taskClass)) {
+ return (StreamTaskFactory) () -> {
+ try {
+ return (StreamTask) taskClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new SamzaException(String.format("Failed to instantiate StreamTask class %s", taskClass.getName()), e);
+ }
+ };
+ } else if (AsyncStreamTask.class.isAssignableFrom(taskClass)) {
+ return (AsyncStreamTaskFactory) () -> {
+ try {
+ return (AsyncStreamTask) taskClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new SamzaException(String.format("Failed to instantiate AsyncStreamTask class %s", taskClass.getName()), e);
+ }
+ };
+ }
+ throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask "
+ + "or AsyncStreamTask", taskClass.getName()));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
index e8be592..34b264f 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
@@ -20,19 +20,18 @@
package org.apache.samza.test.integration;
import joptsimple.OptionSet;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.config.Config;
-import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunnerMain;
-import org.apache.samza.runtime.ApplicationRunnerOperation;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG;
-
/**
- * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn
+ * {@link ApplicationRunnerMain} was designed for deploying {@link SamzaApplication} in yarn
* and doesn't work for in standalone.
*
* This runner class is built for standalone failure tests and not recommended for general use.
@@ -47,17 +46,15 @@ public class LocalApplicationRunnerMain {
Config orgConfig = cmdLine.loadConfig(options);
Config config = Util.rewriteConfig(orgConfig);
- ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
-
- ApplicationRunnerOperation op = cmdLine.getOperation(options);
+ SamzaApplication app = ApplicationUtil.fromConfig(config);
+ ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
try {
LOGGER.info("Launching stream application: {} to run.", app);
- runner.run(app);
+ runner.run();
runner.waitForFinish();
} catch (Exception e) {
- LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e);
+ LOGGER.error("Exception occurred when running application: {}.", app, e);
}
}
}