You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:46 UTC
[03/10] samza git commit: SAMZA-1659: Serializable OperatorSpec
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
new file mode 100644
index 0000000..be1e317
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestSamzaSqlQueryParser {
+
+ @Test
+ public void testParseQuery() {
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
+ Assert.assertEquals("log.foo", queryInfo.getSink());
+ Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
+ Assert.assertEquals(1, queryInfo.getSources().size());
+ Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+ }
+
+ @Test
+ public void testParseJoinQuery() {
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+ Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+ Assert.assertEquals(2, queryInfo.getSources().size());
+ Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+ Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
+ }
+
+ @Test
+ public void testParseInvalidQuery() {
+
+ try {
+ SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
+ Assert.fail("Expected a samzaException");
+ } catch (SamzaException e) {
+ }
+
+ try {
+ SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
+ Assert.fail("Expected a samzaException");
+ } catch (SamzaException e) {
+ }
+
+ try {
+ SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
+ Assert.fail("Expected a samzaException");
+ } catch (SamzaException e) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
new file mode 100644
index 0000000..88ce443
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link FilterTranslator}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LogicalFilter.class)
+public class TestFilterTranslator extends TranslatorTestBase {
+
+ @Test
+ public void testTranslate() throws IOException, ClassNotFoundException {
+ // setup mock values to the constructor of FilterTranslator
+ LogicalFilter mockFilter = PowerMockito.mock(LogicalFilter.class);
+ TranslatorContext mockContext = mock(TranslatorContext.class);
+ RelNode mockInput = mock(RelNode.class);
+ when(mockFilter.getInput()).thenReturn(mockInput);
+ when(mockInput.getId()).thenReturn(1);
+ when(mockFilter.getId()).thenReturn(2);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
+ MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+ when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
+ doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+ RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+ when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+ Expression mockExpr = mock(Expression.class);
+ when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+ // Apply translate() method to verify that we are getting the correct filter operator constructed
+ FilterTranslator filterTranslator = new FilterTranslator();
+ filterTranslator.translate(mockFilter, mockContext);
+ // make sure that context has been registered with LogicFilter and output message streams
+ verify(mockContext, times(1)).registerRelNode(2, mockFilter);
+ verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+ when(mockContext.getRelNode(2)).thenReturn(mockFilter);
+ when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+ StreamOperatorSpec filterSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
+ assertNotNull(filterSpec);
+ assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
+
+ // Verify that the init() method will establish the context for the filter function
+ Config mockConfig = mock(Config.class);
+ TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
+ new HashSet<>(), null, null, null, null, null, null);
+ taskContext.setUserContext(mockContext);
+ filterSpec.getTransformFn().init(mockConfig, taskContext);
+ FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn");
+ assertNotNull(filterFn);
+ assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context"));
+ assertEquals(mockFilter, Whitebox.getInternalState(filterFn, "filter"));
+ assertEquals(mockExpr, Whitebox.getInternalState(filterFn, "expr"));
+
+ // Calling filterFn.apply() to verify the filter function is correctly applied to the input message
+ SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+ SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
+ DataContext dataContext = mock(DataContext.class);
+ when(mockContext.getExecutionContext()).thenReturn(executionContext);
+ when(mockContext.getDataContext()).thenReturn(dataContext);
+ Object[] result = new Object[1];
+
+ doAnswer( invocation -> {
+ Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+ retValue[0] = new Boolean(true);
+ return null;
+ }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+ eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+ assertTrue(filterFn.apply(mockInputMsg));
+
+ doAnswer( invocation -> {
+ Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+ retValue[0] = new Boolean(false);
+ return null;
+ }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+ eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+ assertFalse(filterFn.apply(mockInputMsg));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
new file mode 100644
index 0000000..2de4856
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -0,0 +1,191 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.samza.sql.translator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JoinTranslator}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LogicalJoin.class, EnumerableTableScan.class})
+public class TestJoinTranslator extends TranslatorTestBase {
+
+ @Test
+ public void testTranslateStreamToTableJoin() throws IOException, ClassNotFoundException {
+ // setup mock values to the constructor of FilterTranslator
+ LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
+ TranslatorContext mockContext = mock(TranslatorContext.class);
+ RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
+ RelNode mockRightInput = mock(RelNode.class);
+ List<RelNode> inputs = new ArrayList<>();
+ inputs.add(mockLeftInput);
+ inputs.add(mockRightInput);
+ RelOptTable mockLeftTable = mock(RelOptTable.class);
+ when(mockLeftInput.getTable()).thenReturn(mockLeftTable);
+ List<String> qualifiedTableName = new ArrayList<String>() {{
+ this.add("test");
+ this.add("LeftTable");
+ }};
+ when(mockLeftTable.getQualifiedName()).thenReturn(qualifiedTableName);
+ when(mockLeftInput.getId()).thenReturn(1);
+ when(mockRightInput.getId()).thenReturn(2);
+ when(mockJoin.getId()).thenReturn(3);
+ when(mockJoin.getInputs()).thenReturn(inputs);
+ when(mockJoin.getLeft()).thenReturn(mockLeftInput);
+ when(mockJoin.getRight()).thenReturn(mockRightInput);
+ RexCall mockJoinCondition = mock(RexCall.class);
+ when(mockJoinCondition.isAlwaysTrue()).thenReturn(false);
+ when(mockJoinCondition.getKind()).thenReturn(SqlKind.EQUALS);
+ when(mockJoin.getCondition()).thenReturn(mockJoinCondition);
+ RexInputRef mockLeftConditionInput = mock(RexInputRef.class);
+ RexInputRef mockRightConditionInput = mock(RexInputRef.class);
+ when(mockLeftConditionInput.getIndex()).thenReturn(0);
+ when(mockRightConditionInput.getIndex()).thenReturn(0);
+ List<RexNode> condOperands = new ArrayList<>();
+ condOperands.add(mockLeftConditionInput);
+ condOperands.add(mockRightConditionInput);
+ when(mockJoinCondition.getOperands()).thenReturn(condOperands);
+ RelDataType mockLeftCondDataType = mock(RelDataType.class);
+ RelDataType mockRightCondDataType = mock(RelDataType.class);
+ when(mockLeftCondDataType.getSqlTypeName()).thenReturn(SqlTypeName.BOOLEAN);
+ when(mockRightCondDataType.getSqlTypeName()).thenReturn(SqlTypeName.BOOLEAN);
+ when(mockLeftConditionInput.getType()).thenReturn(mockLeftCondDataType);
+ when(mockRightConditionInput.getType()).thenReturn(mockRightCondDataType);
+ RelDataType mockLeftRowType = mock(RelDataType.class);
+ when(mockLeftRowType.getFieldCount()).thenReturn(0); //?? why ??
+
+ when(mockLeftInput.getRowType()).thenReturn(mockLeftRowType);
+ List<String> leftFieldNames = new ArrayList<String>() {{
+ this.add("test_table_field1");
+ }};
+ List<String> rightStreamFieldNames = new ArrayList<String>() {
+ {
+ this.add("test_stream_field1");
+ } };
+ when(mockLeftRowType.getFieldNames()).thenReturn(leftFieldNames);
+ RelDataType mockRightRowType = mock(RelDataType.class);
+ when(mockRightInput.getRowType()).thenReturn(mockRightRowType);
+ when(mockRightRowType.getFieldNames()).thenReturn(rightStreamFieldNames);
+
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class);
+ MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockGraph, mockLeftInputOp);
+ when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream);
+ OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class);
+ MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockGraph, mockRightInputOp);
+ when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream);
+ when(mockContext.getStreamGraph()).thenReturn(mockGraph);
+
+ InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class);
+ OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class);
+ when(mockInputOp.isKeyed()).thenReturn(true);
+ when(mockOutputStream.isKeyed()).thenReturn(true);
+ IntermediateMessageStreamImpl
+ mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream);
+ when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class))).thenReturn(mockPartitionedStream);
+
+ doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class));
+ RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+ when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+ Expression mockExpr = mock(Expression.class);
+ when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+ doAnswer(this.getRegisteredTableAnswer()).when(mockGraph).getTable(any(RocksDbTableDescriptor.class));
+ when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER);
+ SqlIOResolver mockResolver = mock(SqlIOResolver.class);
+ SqlIOConfig mockIOConfig = mock(SqlIOConfig.class);
+ TableDescriptor mockTableDesc = mock(TableDescriptor.class);
+ when(mockResolver.fetchSourceInfo(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig);
+ when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
+
+ // Apply translate() method to verify that we are getting the correct map operator constructed
+ JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver);
+ joinTranslator.translate(mockJoin, mockContext);
+ // make sure that context has been registered with LogicFilter and output message streams
+ verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
+ when(mockContext.getRelNode(3)).thenReturn(mockJoin);
+ when(mockContext.getMessageStream(3)).thenReturn(this.getRegisteredMessageStream(3));
+ StreamTableJoinOperatorSpec
+ joinSpec = (StreamTableJoinOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(3), "operatorSpec");
+ assertNotNull(joinSpec);
+ assertEquals(joinSpec.getOpCode(), OperatorSpec.OpCode.JOIN);
+
+ // Verify joinSpec has the corresponding setup
+ StreamTableJoinFunction joinFn = joinSpec.getJoinFn();
+ assertNotNull(joinFn);
+ assertTrue(Whitebox.getInternalState(joinFn, "isTablePosOnRight").equals(false));
+ assertEquals(new ArrayList<Integer>() {{ this.add(0); }}, Whitebox.getInternalState(joinFn, "streamFieldIds"));
+ assertEquals(leftFieldNames, Whitebox.getInternalState(joinFn, "tableFieldNames"));
+ List<String> outputFieldNames = new ArrayList<>();
+ outputFieldNames.addAll(leftFieldNames);
+ outputFieldNames.addAll(rightStreamFieldNames);
+ assertEquals(outputFieldNames, Whitebox.getInternalState(joinFn, "outFieldNames"));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
new file mode 100644
index 0000000..f84dd3f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -0,0 +1,289 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.samza.sql.translator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.Pair;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests for {@link ProjectTranslator}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LogicalProject.class)
+public class TestProjectTranslator extends TranslatorTestBase {
+ @Test
+ public void testTranslate() throws IOException, ClassNotFoundException {
+ // setup mock values to the constructor of FilterTranslator
+ LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
+ TranslatorContext mockContext = mock(TranslatorContext.class);
+ RelNode mockInput = mock(RelNode.class);
+ List<RelNode> inputs = new ArrayList<>();
+ inputs.add(mockInput);
+ when(mockInput.getId()).thenReturn(1);
+ when(mockProject.getId()).thenReturn(2);
+ when(mockProject.getInputs()).thenReturn(inputs);
+ when(mockProject.getInput()).thenReturn(mockInput);
+ RelDataType mockRowType = mock(RelDataType.class);
+ when(mockRowType.getFieldCount()).thenReturn(1);
+ when(mockProject.getRowType()).thenReturn(mockRowType);
+ RexNode mockRexField = mock(RexNode.class);
+ List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
+ namedProjects.add(Pair.of(mockRexField, "test_field"));
+ when(mockProject.getNamedProjects()).thenReturn(namedProjects);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
+ MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+ when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
+ doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+ RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+ when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+ Expression mockExpr = mock(Expression.class);
+ when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+ // Apply translate() method to verify that we are getting the correct map operator constructed
+ ProjectTranslator projectTranslator = new ProjectTranslator();
+ projectTranslator.translate(mockProject, mockContext);
+ // make sure that context has been registered with LogicFilter and output message streams
+ verify(mockContext, times(1)).registerRelNode(2, mockProject);
+ verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+ when(mockContext.getRelNode(2)).thenReturn(mockProject);
+ when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+ StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
+ assertNotNull(projectSpec);
+ assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
+
+ // Verify that the init() method will establish the context for the map function
+ Config mockConfig = mock(Config.class);
+ TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
+ new HashSet<>(), null, null, null, null, null, null);
+ taskContext.setUserContext(mockContext);
+ projectSpec.getTransformFn().init(mockConfig, taskContext);
+ MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
+ assertNotNull(mapFn);
+ assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+ assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
+ assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+
+ // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
+ SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+ SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
+ DataContext dataContext = mock(DataContext.class);
+ when(mockContext.getExecutionContext()).thenReturn(executionContext);
+ when(mockContext.getDataContext()).thenReturn(dataContext);
+ Object[] result = new Object[1];
+ final Object mockFieldObj = new Object();
+
+ doAnswer( invocation -> {
+ Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+ retValue[0] = mockFieldObj;
+ return null;
+ }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+ eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+ SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
+ assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
+ new ArrayList<String>() {{
+ this.add("test_field");
+ }});
+ assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new ArrayList<Object>() {{
+ this.add(mockFieldObj);
+ }});
+
+ }
+
+ @Test
+ public void testTranslateWithFlatten() throws IOException, ClassNotFoundException {
+ // setup mock values to the constructor of ProjectTranslator
+ LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
+ TranslatorContext mockContext = mock(TranslatorContext.class);
+ RelNode mockInput = mock(RelNode.class);
+ List<RelNode> inputs = new ArrayList<>();
+ inputs.add(mockInput);
+ when(mockInput.getId()).thenReturn(1);
+ when(mockProject.getId()).thenReturn(2);
+ when(mockProject.getInputs()).thenReturn(inputs);
+ when(mockProject.getInput()).thenReturn(mockInput);
+ RelDataType mockRowType = mock(RelDataType.class);
+ when(mockRowType.getFieldCount()).thenReturn(1);
+ when(mockProject.getRowType()).thenReturn(mockRowType);
+ RexNode mockRexField = mock(RexNode.class);
+ List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
+ namedProjects.add(Pair.of(mockRexField, "test_field"));
+ when(mockProject.getNamedProjects()).thenReturn(namedProjects);
+ List<RexNode> flattenProjects = new ArrayList<>();
+ RexCall mockFlattenProject = mock(RexCall.class);
+ SqlUserDefinedFunction sqlFlattenUdf = mock(SqlUserDefinedFunction.class);
+ when(sqlFlattenUdf.getName()).thenReturn("flatten");
+ List<RexNode> flattenUdfOperands = new ArrayList<>();
+ RexInputRef rexInputRef = mock(RexInputRef.class);
+ when(rexInputRef.getIndex()).thenReturn(0);
+ flattenUdfOperands.add(rexInputRef);
+ when(mockFlattenProject.getOperands()).thenReturn(flattenUdfOperands);
+ Whitebox.setInternalState(mockFlattenProject, "op", sqlFlattenUdf);
+ flattenProjects.add(mockFlattenProject);
+ when(mockProject.getProjects()).thenReturn(flattenProjects);
+
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new OperatorSpec(OperatorSpec.OpCode.INPUT, "1") {
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return null;
+ }
+
+ @Override
+ public TimerFunction getTimerFn() {
+ return null;
+ }
+ };
+
+ MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+ when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
+ doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+ RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+ when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+ Expression mockExpr = mock(Expression.class);
+ when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+ // Apply translate() method to verify that we are getting the correct map operator constructed
+ ProjectTranslator projectTranslator = new ProjectTranslator();
+ projectTranslator.translate(mockProject, mockContext);
+ // make sure that context has been registered with LogicFilter and output message streams
+ verify(mockContext, times(1)).registerRelNode(2, mockProject);
+ verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+ when(mockContext.getRelNode(2)).thenReturn(mockProject);
+ when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+
+
+ Collection<OperatorSpec>
+ nextOps = ((OperatorSpec) Whitebox.getInternalState(mockStream, "operatorSpec")).getRegisteredOperatorSpecs();
+ StreamOperatorSpec flattenOp = (StreamOperatorSpec) nextOps.iterator().next();
+ assertNotNull(flattenOp);
+ Object testObj = new Object();
+ SamzaSqlRelMessage mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{
+ this.add("test_field_no1");
+ }}, new ArrayList<Object>() {{
+ this.add(testObj);
+ }});
+ Collection<SamzaSqlRelMessage> flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
+ assertTrue(flattenedMsgs.size() == 1);
+ assertTrue(flattenedMsgs.stream().anyMatch(s -> s.getSamzaSqlRelRecord().getFieldValues().get(0).equals(testObj)));
+ List<Integer> testList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ testList.add(new Integer(i));
+ }
+ mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{
+ this.add("test_list_field1");
+ }}, new ArrayList<Object>() {{
+ this.add(testList);
+ }});
+ flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
+ assertTrue(flattenedMsgs.size() == 10);
+ List<Integer> actualList = flattenedMsgs.stream()
+ .map(m -> ((List<Integer>) m.getSamzaSqlRelRecord().getFieldValues().get(0)).get(0))
+ .collect(ArrayList::new, (c, a) -> c.add(a), (c1, c2) -> c1.addAll(c2));
+ assertEquals(testList, actualList);
+
+ StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
+ assertNotNull(projectSpec);
+ assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
+
+ // Verify that the init() method will establish the context for the map function
+ Config mockConfig = mock(Config.class);
+ TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
+ new HashSet<>(), null, null, null, null, null, null);
+ taskContext.setUserContext(mockContext);
+ projectSpec.getTransformFn().init(mockConfig, taskContext);
+ MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
+ assertNotNull(mapFn);
+ assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+ assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
+ assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+
+ // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
+ SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+ SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
+ DataContext dataContext = mock(DataContext.class);
+ when(mockContext.getExecutionContext()).thenReturn(executionContext);
+ when(mockContext.getDataContext()).thenReturn(dataContext);
+ Object[] result = new Object[1];
+ final Object mockFieldObj = new Object();
+
+ doAnswer( invocation -> {
+ Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+ retValue[0] = mockFieldObj;
+ return null;
+ }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+ eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+ SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
+ assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
+ new ArrayList<String>() {{
+ this.add("test_field");
+ }});
+ assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new ArrayList<Object>() {{
+ this.add(mockFieldObj);
+ }});
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
new file mode 100644
index 0000000..65b8c8c
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -0,0 +1,596 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.HashSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+public class TestQueryTranslator {
+
+ // Helper functions to validate the cloned copies of TranslatorContext and SamzaSqlExecutionContext
+ private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) {
+ Assert.assertNotEquals(originContext, clonedContext);
+ Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
+ Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph());
+ Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
+ Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters"));
+ Assert.assertTrue(Whitebox.getInternalState(originContext, "messsageStreams") == Whitebox.getInternalState(clonedContext, "messsageStreams"));
+ Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes"));
+ Assert.assertNotEquals(originContext.getDataContext(), clonedContext.getDataContext());
+ validateClonedExecutionContext(originContext.getExecutionContext(), clonedContext.getExecutionContext());
+ }
+
+ private void validateClonedExecutionContext(SamzaSqlExecutionContext originContext,
+ SamzaSqlExecutionContext clonedContext) {
+ Assert.assertNotEquals(originContext, clonedContext);
+ Assert.assertTrue(
+ Whitebox.getInternalState(originContext, "sqlConfig") == Whitebox.getInternalState(clonedContext, "sqlConfig"));
+ Assert.assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") == Whitebox.getInternalState(clonedContext,
+ "udfMetadata"));
+ Assert.assertTrue(Whitebox.getInternalState(originContext, "udfInstances") != Whitebox.getInternalState(clonedContext,
+ "udfInstances"));
+ }
+
+ private final Map<String, String> configs = new HashMap<>();
+
+ @Before
+ public void setUp() {
+ configs.put("job.default.system", "kafka");
+ }
+
+ @Test
+ public void testTranslate() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ Assert.assertEquals(1, specGraph.getOutputStreams().size());
+ Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals(1, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("SIMPLE1",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+
+ validatePerTaskContextInit(graphSpec, samzaConfig);
+ }
+
+ private void validatePerTaskContextInit(StreamGraphSpec graphSpec, Config samzaConfig) {
+ // make sure that each task context would have a separate instance of cloned TranslatorContext
+ TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null,
+ new HashSet<>(), null, null, null, null, null, null);
+ // call ContextManager.init() to instantiate the per-task TranslatorContext
+ graphSpec.getContextManager().init(samzaConfig, testContext);
+ Assert.assertNotNull(testContext.getUserContext());
+ Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
+ TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext();
+ // call ContextManager.init() second time to instantiate another clone of TranslatorContext
+ graphSpec.getContextManager().init(samzaConfig, testContext);
+ Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
+ // validate the two copies of TranslatorContext are clones of each other
+ validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext());
+ }
+
+ @Test
+ public void testTranslateComplex() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1");
+// config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+// "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 "
+// + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ Assert.assertEquals(1, specGraph.getOutputStreams().size());
+ Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals(1, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("COMPLEX1",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+
+ validatePerTaskContextInit(graphSpec, samzaConfig);
+ }
+
+ @Test
+ public void testTranslateSubQuery() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+ "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)");
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+ Assert.assertEquals(1, specGraph.getOutputStreams().size());
+ Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals(1, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("COMPLEX1",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+
+ validatePerTaskContextInit(graphSpec, samzaConfig);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableJoinWithoutJoinOperator() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
+ + " where p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableJoinWithFullJoinOperator() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " full join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = IllegalStateException.class)
+ public void testTranslateStreamTableJoinWithSelfJoinOperator() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p1.name as profileName"
+ + " from testavro.PROFILE.`$table` as p1"
+ + " join testavro.PROFILE.`$table` as p2"
+ + " on p1.id = p2.id";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableJoinWithThetaCondition() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id <> pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableCrossJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableJoinWithAndLiteralCondition() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId and p.name = 'John'";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableJoinWithSubQuery() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " where exists "
+ + " (select p.id from testavro.PROFILE.`$table` as p"
+ + " where p.id = pv.profileId)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateTableTableJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW.`$table` as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamStreamJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateJoinWithIncorrectLeftJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW.`$table` as pv"
+ + " left join testavro.PROFILE as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateJoinWithIncorrectRightJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " right join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableInnerJoinWithMissingStream() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String configIOResolverDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+ config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ ConfigBasedIOResolverFactory.class.getName());
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.`$table` as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateStreamTableInnerJoinWithUdf() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on MyTest(p.id) = MyTest(pv.profileId)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+
+ @Test
+ public void testTranslateStreamTableInnerJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+ Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("sql-job-1-partition_by-stream_1", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+ Assert.assertEquals("enrichedPageViewTopic", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+ Assert.assertEquals(3, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("PAGEVIEW",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+ Assert.assertEquals("PROFILE",
+ specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+ Assert.assertEquals("kafka",
+ specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+ Assert.assertEquals("sql-job-1-partition_by-stream_1",
+ specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+
+ validatePerTaskContextInit(graphSpec, samzaConfig);
+ }
+
+ @Test
+ public void testTranslateStreamTableLeftJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " left join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+ Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("sql-job-1-partition_by-stream_1",
+ specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+ Assert.assertEquals("enrichedPageViewTopic",
+ specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+ Assert.assertEquals(3, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("PAGEVIEW",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+ Assert.assertEquals("PROFILE",
+ specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+ Assert.assertEquals("kafka",
+ specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+ Assert.assertEquals("sql-job-1-partition_by-stream_1",
+ specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+
+ validatePerTaskContextInit(graphSpec, samzaConfig);
+ }
+
+ @Test
+ public void testTranslateStreamTableRightJoin() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PROFILE.`$table` as p"
+ + " right join testavro.PAGEVIEW as pv"
+ + " on p.id = pv.profileId";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+ Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("sql-job-1-partition_by-stream_1",
+ specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+ Assert.assertEquals("enrichedPageViewTopic",
+ specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+ Assert.assertEquals(3, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+ Assert.assertEquals("PROFILE",
+ specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+ Assert.assertEquals("testavro",
+ specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+ Assert.assertEquals("PAGEVIEW",
+ specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+ Assert.assertEquals("kafka",
+ specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+ Assert.assertEquals("sql-job-1-partition_by-stream_1",
+ specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+
+ validatePerTaskContextInit(graphSpec, samzaConfig);
+ }
+
+ @Test
+ public void testTranslateGroupBy() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.pageViewCountTopic"
+ + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
+ + " from testavro.PAGEVIEW as pv"
+ + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+ + " group by (pv.pageKey)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+ Assert.assertEquals(1, specGraph.getInputOperators().size());
+ Assert.assertEquals(1, specGraph.getOutputStreams().size());
+ Assert.assertTrue(specGraph.hasWindowOrJoins());
+ Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs();
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateGroupByWithSumAggregator() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.pageViewCountTopic"
+ + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`"
+ + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+ + " group by (pv.pageKey)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphSpec
+ graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, graphSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
new file mode 100644
index 0000000..5dd2d21
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
@@ -0,0 +1,118 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessageJoinFunction {
+
+ private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
+ private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
+ private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14");
+ private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5");
+
+ @Test
+ public void testWithInnerJoinWithTableOnRight() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(0, 1);
+ List<Integer> tableKeyIds = Arrays.asList(0, 1);
+ SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+ KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+ SamzaSqlRelMessageJoinFunction joinFn =
+ new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+ expectedFieldNames.addAll(tableFieldNames);
+ List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+ expectedFieldValues.addAll(tableFieldValues);
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+ }
+
+ @Test
+ public void testWithInnerJoinWithTableOnLeft() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(0, 2);
+ List<Integer> tableKeyIds = Arrays.asList(0, 2);
+ SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+ KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+ SamzaSqlRelMessageJoinFunction joinFn =
+ new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
+ expectedFieldNames.addAll(streamFieldNames);
+ List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
+ expectedFieldValues.addAll(streamFieldValues);
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+ }
+
+ @Test
+ public void testNullRecordWithInnerJoin() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ JoinRelType joinRelType = JoinRelType.INNER;
+ List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+ SamzaSqlRelMessageJoinFunction joinFn =
+ new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+ Assert.assertNull(outMsg);
+ }
+
+ @Test
+ public void testNullRecordWithLeftOuterJoin() {
+ SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+ JoinRelType joinRelType = JoinRelType.LEFT;
+ List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+ SamzaSqlRelMessageJoinFunction joinFn =
+ new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
+ tableFieldNames);
+ SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+ outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+ List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+ expectedFieldNames.addAll(tableFieldNames);
+ List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+ expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList()));
+ Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
new file mode 100644
index 0000000..a74993f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
@@ -0,0 +1,72 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.storage.kv.RocksDbTableProvider;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Base class for all unit tests for translators
+ */
+public class TranslatorTestBase {
+ Map<Integer, MessageStream> registeredStreams = new HashMap<>();
+ Map<String, TableImpl> registeredTables = new HashMap<>();
+
+ Answer getRegisterMessageStreamAnswer() {
+ return (InvocationOnMock x) -> {
+ Integer id = x.getArgumentAt(0, Integer.class);
+ MessageStream stream = x.getArgumentAt(1, MessageStream.class);
+ registeredStreams.put(id, stream);
+ return null;
+ };
+ }
+
+ Answer getRegisteredTableAnswer() {
+ return (InvocationOnMock x) -> {
+ TableDescriptor descriptor = x.getArgumentAt(0, TableDescriptor.class);
+ TableSpec mockTableSpec = new TableSpec(descriptor.getTableId(), KVSerde.of(new StringSerde(),
+ new JsonSerdeV2<SamzaSqlRelMessage>()), RocksDbTableProvider.class.getCanonicalName(), new HashMap<>());
+ TableImpl mockTable = mock(TableImpl.class);
+ when(mockTable.getTableSpec()).thenReturn(mockTableSpec);
+ this.registeredTables.putIfAbsent(descriptor.getTableId(), mockTable);
+ return this.registeredTables.get(descriptor.getTableId());
+ };
+ }
+
+ MessageStream getRegisteredMessageStream(int id) {
+ return this.registeredStreams.get(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
new file mode 100644
index 0000000..c029eb4
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.time.Duration;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class AppWithGlobalConfigExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ AppWithGlobalConfigExample app = new AppWithGlobalConfigExample();
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ graph.getInputStream("myPageViewEevent", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)))
+ .map(KV::getValue)
+ .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null)
+ .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+ .setAccumulationMode(AccumulationMode.DISCARDING), "w1")
+ .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
+ .sendTo(graph.getOutputStream("pageViewEventPerMemberStream", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewCount.class))));
+ }
+
+ class PageViewEvent {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+ }
+
+ static class PageViewCount {
+ String memberId;
+ long timestamp;
+ int count;
+
+ PageViewCount(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+ }
+}