You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:46 UTC

[03/10] samza git commit: SAMZA-1659: Serializable OperatorSpec

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
new file mode 100644
index 0000000..be1e317
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestSamzaSqlQueryParser {
+
+  @Test
+  public void testParseQuery() {
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
+    Assert.assertEquals("log.foo", queryInfo.getSink());
+    Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
+    Assert.assertEquals(1, queryInfo.getSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+  }
+
+  @Test
+  public void testParseJoinQuery() {
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+    Assert.assertEquals(2, queryInfo.getSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
+  }
+
+  @Test
+  public void testParseInvalidQuery() {
+
+    try {
+      SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
new file mode 100644
index 0000000..88ce443
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link FilterTranslator}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LogicalFilter.class)
+public class TestFilterTranslator extends TranslatorTestBase {
+
+  @Test
+  public void testTranslate() throws IOException, ClassNotFoundException {
+    // setup mock values to the constructor of FilterTranslator
+    LogicalFilter mockFilter = PowerMockito.mock(LogicalFilter.class);
+    TranslatorContext mockContext = mock(TranslatorContext.class);
+    RelNode mockInput = mock(RelNode.class);
+    when(mockFilter.getInput()).thenReturn(mockInput);
+    when(mockInput.getId()).thenReturn(1);
+    when(mockFilter.getId()).thenReturn(2);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
+    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+    when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+    RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    Expression mockExpr = mock(Expression.class);
+    when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+    // Apply translate() method to verify that we are getting the correct filter operator constructed
+    FilterTranslator filterTranslator = new FilterTranslator();
+    filterTranslator.translate(mockFilter, mockContext);
+    // make sure that context has been registered with LogicFilter and output message streams
+    verify(mockContext, times(1)).registerRelNode(2, mockFilter);
+    verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+    when(mockContext.getRelNode(2)).thenReturn(mockFilter);
+    when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+    StreamOperatorSpec filterSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
+    assertNotNull(filterSpec);
+    assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
+
+    // Verify that the init() method will establish the context for the filter function
+    Config mockConfig = mock(Config.class);
+    TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
+        new HashSet<>(), null, null, null, null, null, null);
+    taskContext.setUserContext(mockContext);
+    filterSpec.getTransformFn().init(mockConfig, taskContext);
+    FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn");
+    assertNotNull(filterFn);
+    assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context"));
+    assertEquals(mockFilter, Whitebox.getInternalState(filterFn, "filter"));
+    assertEquals(mockExpr, Whitebox.getInternalState(filterFn, "expr"));
+
+    // Calling filterFn.apply() to verify the filter function is correctly applied to the input message
+    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+    SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
+    DataContext dataContext = mock(DataContext.class);
+    when(mockContext.getExecutionContext()).thenReturn(executionContext);
+    when(mockContext.getDataContext()).thenReturn(dataContext);
+    Object[] result = new Object[1];
+
+    doAnswer( invocation -> {
+      Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+      retValue[0] = new Boolean(true);
+      return null;
+    }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+        eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+    assertTrue(filterFn.apply(mockInputMsg));
+
+    doAnswer( invocation -> {
+      Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+      retValue[0] = new Boolean(false);
+      return null;
+    }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+        eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+    assertFalse(filterFn.apply(mockInputMsg));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
new file mode 100644
index 0000000..2de4856
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -0,0 +1,191 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.samza.sql.translator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JoinTranslator}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LogicalJoin.class, EnumerableTableScan.class})
+public class TestJoinTranslator extends TranslatorTestBase {
+
+  @Test
+  public void testTranslateStreamToTableJoin() throws IOException, ClassNotFoundException {
+    // setup mock values to the constructor of FilterTranslator
+    LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
+    TranslatorContext mockContext = mock(TranslatorContext.class);
+    RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
+    RelNode mockRightInput = mock(RelNode.class);
+    List<RelNode> inputs = new ArrayList<>();
+    inputs.add(mockLeftInput);
+    inputs.add(mockRightInput);
+    RelOptTable mockLeftTable = mock(RelOptTable.class);
+    when(mockLeftInput.getTable()).thenReturn(mockLeftTable);
+    List<String> qualifiedTableName = new ArrayList<String>() {{
+      this.add("test");
+      this.add("LeftTable");
+    }};
+    when(mockLeftTable.getQualifiedName()).thenReturn(qualifiedTableName);
+    when(mockLeftInput.getId()).thenReturn(1);
+    when(mockRightInput.getId()).thenReturn(2);
+    when(mockJoin.getId()).thenReturn(3);
+    when(mockJoin.getInputs()).thenReturn(inputs);
+    when(mockJoin.getLeft()).thenReturn(mockLeftInput);
+    when(mockJoin.getRight()).thenReturn(mockRightInput);
+    RexCall mockJoinCondition = mock(RexCall.class);
+    when(mockJoinCondition.isAlwaysTrue()).thenReturn(false);
+    when(mockJoinCondition.getKind()).thenReturn(SqlKind.EQUALS);
+    when(mockJoin.getCondition()).thenReturn(mockJoinCondition);
+    RexInputRef mockLeftConditionInput = mock(RexInputRef.class);
+    RexInputRef mockRightConditionInput = mock(RexInputRef.class);
+    when(mockLeftConditionInput.getIndex()).thenReturn(0);
+    when(mockRightConditionInput.getIndex()).thenReturn(0);
+    List<RexNode> condOperands = new ArrayList<>();
+    condOperands.add(mockLeftConditionInput);
+    condOperands.add(mockRightConditionInput);
+    when(mockJoinCondition.getOperands()).thenReturn(condOperands);
+    RelDataType mockLeftCondDataType = mock(RelDataType.class);
+    RelDataType mockRightCondDataType = mock(RelDataType.class);
+    when(mockLeftCondDataType.getSqlTypeName()).thenReturn(SqlTypeName.BOOLEAN);
+    when(mockRightCondDataType.getSqlTypeName()).thenReturn(SqlTypeName.BOOLEAN);
+    when(mockLeftConditionInput.getType()).thenReturn(mockLeftCondDataType);
+    when(mockRightConditionInput.getType()).thenReturn(mockRightCondDataType);
+    RelDataType mockLeftRowType = mock(RelDataType.class);
+    when(mockLeftRowType.getFieldCount()).thenReturn(0); //?? why ??
+
+    when(mockLeftInput.getRowType()).thenReturn(mockLeftRowType);
+    List<String> leftFieldNames = new ArrayList<String>() {{
+      this.add("test_table_field1");
+    }};
+    List<String> rightStreamFieldNames = new ArrayList<String>() {
+      {
+        this.add("test_stream_field1");
+      } };
+    when(mockLeftRowType.getFieldNames()).thenReturn(leftFieldNames);
+    RelDataType mockRightRowType = mock(RelDataType.class);
+    when(mockRightInput.getRowType()).thenReturn(mockRightRowType);
+    when(mockRightRowType.getFieldNames()).thenReturn(rightStreamFieldNames);
+
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class);
+    MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockGraph, mockLeftInputOp);
+    when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream);
+    OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class);
+    MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockGraph, mockRightInputOp);
+    when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream);
+    when(mockContext.getStreamGraph()).thenReturn(mockGraph);
+
+    InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class);
+    OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class);
+    when(mockInputOp.isKeyed()).thenReturn(true);
+    when(mockOutputStream.isKeyed()).thenReturn(true);
+    IntermediateMessageStreamImpl
+        mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream);
+    when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class))).thenReturn(mockPartitionedStream);
+
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class));
+    RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    Expression mockExpr = mock(Expression.class);
+    when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+    doAnswer(this.getRegisteredTableAnswer()).when(mockGraph).getTable(any(RocksDbTableDescriptor.class));
+    when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER);
+    SqlIOResolver mockResolver = mock(SqlIOResolver.class);
+    SqlIOConfig mockIOConfig = mock(SqlIOConfig.class);
+    TableDescriptor mockTableDesc = mock(TableDescriptor.class);
+    when(mockResolver.fetchSourceInfo(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig);
+    when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
+
+    // Apply translate() method to verify that we are getting the correct map operator constructed
+    JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver);
+    joinTranslator.translate(mockJoin, mockContext);
+    // make sure that context has been registered with LogicFilter and output message streams
+    verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
+    when(mockContext.getRelNode(3)).thenReturn(mockJoin);
+    when(mockContext.getMessageStream(3)).thenReturn(this.getRegisteredMessageStream(3));
+    StreamTableJoinOperatorSpec
+        joinSpec = (StreamTableJoinOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(3), "operatorSpec");
+    assertNotNull(joinSpec);
+    assertEquals(joinSpec.getOpCode(), OperatorSpec.OpCode.JOIN);
+
+    // Verify joinSpec has the corresponding setup
+    StreamTableJoinFunction joinFn = joinSpec.getJoinFn();
+    assertNotNull(joinFn);
+    assertTrue(Whitebox.getInternalState(joinFn, "isTablePosOnRight").equals(false));
+    assertEquals(new ArrayList<Integer>() {{ this.add(0); }}, Whitebox.getInternalState(joinFn, "streamFieldIds"));
+    assertEquals(leftFieldNames, Whitebox.getInternalState(joinFn, "tableFieldNames"));
+    List<String> outputFieldNames = new ArrayList<>();
+    outputFieldNames.addAll(leftFieldNames);
+    outputFieldNames.addAll(rightStreamFieldNames);
+    assertEquals(outputFieldNames, Whitebox.getInternalState(joinFn, "outFieldNames"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
new file mode 100644
index 0000000..f84dd3f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -0,0 +1,289 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.samza.sql.translator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.Pair;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.RexToJavaCompiler;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests for {@link ProjectTranslator}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LogicalProject.class)
+public class TestProjectTranslator extends TranslatorTestBase {
+  @Test
+  public void testTranslate() throws IOException, ClassNotFoundException {
+    // setup mock values to the constructor of FilterTranslator
+    LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
+    TranslatorContext mockContext = mock(TranslatorContext.class);
+    RelNode mockInput = mock(RelNode.class);
+    List<RelNode> inputs = new ArrayList<>();
+    inputs.add(mockInput);
+    when(mockInput.getId()).thenReturn(1);
+    when(mockProject.getId()).thenReturn(2);
+    when(mockProject.getInputs()).thenReturn(inputs);
+    when(mockProject.getInput()).thenReturn(mockInput);
+    RelDataType mockRowType = mock(RelDataType.class);
+    when(mockRowType.getFieldCount()).thenReturn(1);
+    when(mockProject.getRowType()).thenReturn(mockRowType);
+    RexNode mockRexField = mock(RexNode.class);
+    List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
+    namedProjects.add(Pair.of(mockRexField, "test_field"));
+    when(mockProject.getNamedProjects()).thenReturn(namedProjects);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
+    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+    when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+    RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    Expression mockExpr = mock(Expression.class);
+    when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+    // Apply translate() method to verify that we are getting the correct map operator constructed
+    ProjectTranslator projectTranslator = new ProjectTranslator();
+    projectTranslator.translate(mockProject, mockContext);
+    // make sure that context has been registered with LogicFilter and output message streams
+    verify(mockContext, times(1)).registerRelNode(2, mockProject);
+    verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+    when(mockContext.getRelNode(2)).thenReturn(mockProject);
+    when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+    StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
+    assertNotNull(projectSpec);
+    assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
+
+    // Verify that the init() method will establish the context for the map function
+    Config mockConfig = mock(Config.class);
+    TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
+        new HashSet<>(), null, null, null, null, null, null);
+    taskContext.setUserContext(mockContext);
+    projectSpec.getTransformFn().init(mockConfig, taskContext);
+    MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
+    assertNotNull(mapFn);
+    assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+    assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
+    assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+
+    // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
+    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+    SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
+    DataContext dataContext = mock(DataContext.class);
+    when(mockContext.getExecutionContext()).thenReturn(executionContext);
+    when(mockContext.getDataContext()).thenReturn(dataContext);
+    Object[] result = new Object[1];
+    final Object mockFieldObj = new Object();
+
+    doAnswer( invocation -> {
+      Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+      retValue[0] = mockFieldObj;
+      return null;
+    }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+        eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+    SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
+    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
+        new ArrayList<String>() {{
+          this.add("test_field");
+        }});
+    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new ArrayList<Object>() {{
+          this.add(mockFieldObj);
+        }});
+
+  }
+
+  @Test
+  public void testTranslateWithFlatten() throws IOException, ClassNotFoundException {
+    // setup mock values to the constructor of ProjectTranslator
+    LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
+    TranslatorContext mockContext = mock(TranslatorContext.class);
+    RelNode mockInput = mock(RelNode.class);
+    List<RelNode> inputs = new ArrayList<>();
+    inputs.add(mockInput);
+    when(mockInput.getId()).thenReturn(1);
+    when(mockProject.getId()).thenReturn(2);
+    when(mockProject.getInputs()).thenReturn(inputs);
+    when(mockProject.getInput()).thenReturn(mockInput);
+    RelDataType mockRowType = mock(RelDataType.class);
+    when(mockRowType.getFieldCount()).thenReturn(1);
+    when(mockProject.getRowType()).thenReturn(mockRowType);
+    RexNode mockRexField = mock(RexNode.class);
+    List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
+    namedProjects.add(Pair.of(mockRexField, "test_field"));
+    when(mockProject.getNamedProjects()).thenReturn(namedProjects);
+    List<RexNode> flattenProjects = new ArrayList<>();
+    RexCall mockFlattenProject = mock(RexCall.class);
+    SqlUserDefinedFunction sqlFlattenUdf = mock(SqlUserDefinedFunction.class);
+    when(sqlFlattenUdf.getName()).thenReturn("flatten");
+    List<RexNode> flattenUdfOperands = new ArrayList<>();
+    RexInputRef rexInputRef = mock(RexInputRef.class);
+    when(rexInputRef.getIndex()).thenReturn(0);
+    flattenUdfOperands.add(rexInputRef);
+    when(mockFlattenProject.getOperands()).thenReturn(flattenUdfOperands);
+    Whitebox.setInternalState(mockFlattenProject, "op", sqlFlattenUdf);
+    flattenProjects.add(mockFlattenProject);
+    when(mockProject.getProjects()).thenReturn(flattenProjects);
+
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new OperatorSpec(OperatorSpec.OpCode.INPUT, "1") {
+
+      @Override
+      public WatermarkFunction getWatermarkFn() {
+        return null;
+      }
+
+      @Override
+      public TimerFunction getTimerFn() {
+        return null;
+      }
+    };
+
+    MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
+    when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+    RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
+    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    Expression mockExpr = mock(Expression.class);
+    when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+
+    // Apply translate() method to verify that we are getting the correct map operator constructed
+    ProjectTranslator projectTranslator = new ProjectTranslator();
+    projectTranslator.translate(mockProject, mockContext);
+    // make sure that context has been registered with LogicFilter and output message streams
+    verify(mockContext, times(1)).registerRelNode(2, mockProject);
+    verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+    when(mockContext.getRelNode(2)).thenReturn(mockProject);
+    when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+
+
+    Collection<OperatorSpec>
+        nextOps = ((OperatorSpec) Whitebox.getInternalState(mockStream, "operatorSpec")).getRegisteredOperatorSpecs();
+    StreamOperatorSpec flattenOp = (StreamOperatorSpec) nextOps.iterator().next();
+    assertNotNull(flattenOp);
+    Object testObj = new Object();
+    SamzaSqlRelMessage mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{
+      this.add("test_field_no1");
+    }}, new ArrayList<Object>() {{
+      this.add(testObj);
+    }});
+    Collection<SamzaSqlRelMessage> flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
+    assertTrue(flattenedMsgs.size() == 1);
+    assertTrue(flattenedMsgs.stream().anyMatch(s -> s.getSamzaSqlRelRecord().getFieldValues().get(0).equals(testObj)));
+    List<Integer> testList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      testList.add(new Integer(i));
+    }
+    mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{
+      this.add("test_list_field1");
+    }}, new ArrayList<Object>() {{
+      this.add(testList);
+    }});
+    flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
+    assertTrue(flattenedMsgs.size() == 10);
+    List<Integer> actualList = flattenedMsgs.stream()
+        .map(m -> ((List<Integer>) m.getSamzaSqlRelRecord().getFieldValues().get(0)).get(0))
+        .collect(ArrayList::new, (c, a) -> c.add(a), (c1, c2) -> c1.addAll(c2));
+    assertEquals(testList, actualList);
+
+    StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
+    assertNotNull(projectSpec);
+    assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
+
+    // Verify that the init() method will establish the context for the map function
+    Config mockConfig = mock(Config.class);
+    TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
+        new HashSet<>(), null, null, null, null, null, null);
+    taskContext.setUserContext(mockContext);
+    projectSpec.getTransformFn().init(mockConfig, taskContext);
+    MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
+    assertNotNull(mapFn);
+    assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+    assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
+    assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+
+    // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
+    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+    SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
+    DataContext dataContext = mock(DataContext.class);
+    when(mockContext.getExecutionContext()).thenReturn(executionContext);
+    when(mockContext.getDataContext()).thenReturn(dataContext);
+    Object[] result = new Object[1];
+    final Object mockFieldObj = new Object();
+
+    doAnswer( invocation -> {
+      Object[] retValue = invocation.getArgumentAt(3, Object[].class);
+      retValue[0] = mockFieldObj;
+      return null;
+    }).when(mockExpr).execute(eq(executionContext), eq(dataContext),
+        eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
+    SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
+    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
+        new ArrayList<String>() {{
+          this.add("test_field");
+        }});
+    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new ArrayList<Object>() {{
+      this.add(mockFieldObj);
+    }});
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
new file mode 100644
index 0000000..65b8c8c
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -0,0 +1,596 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.HashSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+public class TestQueryTranslator {
+
+  // Helper functions to validate the cloned copies of TranslatorContext and SamzaSqlExecutionContext
+  private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) {
+    Assert.assertNotEquals(originContext, clonedContext);
+    Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
+    Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph());
+    Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler());
+    Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters"));
+    Assert.assertTrue(Whitebox.getInternalState(originContext, "messsageStreams") == Whitebox.getInternalState(clonedContext, "messsageStreams"));
+    Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes"));
+    Assert.assertNotEquals(originContext.getDataContext(), clonedContext.getDataContext());
+    validateClonedExecutionContext(originContext.getExecutionContext(), clonedContext.getExecutionContext());
+  }
+
+  private void validateClonedExecutionContext(SamzaSqlExecutionContext originContext,
+      SamzaSqlExecutionContext clonedContext) {
+    Assert.assertNotEquals(originContext, clonedContext);
+    Assert.assertTrue(
+        Whitebox.getInternalState(originContext, "sqlConfig") == Whitebox.getInternalState(clonedContext, "sqlConfig"));
+    Assert.assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") == Whitebox.getInternalState(clonedContext,
+        "udfMetadata"));
+    Assert.assertTrue(Whitebox.getInternalState(originContext, "udfInstances") != Whitebox.getInternalState(clonedContext,
+        "udfInstances"));
+  }
+
+  private final Map<String, String> configs = new HashMap<>();
+
+  @Before
+  public void setUp() {
+    configs.put("job.default.system", "kafka");
+  }
+
+  @Test
+  public void testTranslate() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    Assert.assertEquals(1, specGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals(1, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("SIMPLE1",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+
+    validatePerTaskContextInit(graphSpec, samzaConfig);
+  }
+
+  private void validatePerTaskContextInit(StreamGraphSpec graphSpec, Config samzaConfig) {
+    // make sure that each task context would have a separate instance of cloned TranslatorContext
+    TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null,
+        new HashSet<>(), null, null, null, null, null, null);
+    // call ContextManager.init() to instantiate the per-task TranslatorContext
+    graphSpec.getContextManager().init(samzaConfig, testContext);
+    Assert.assertNotNull(testContext.getUserContext());
+    Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
+    TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext();
+    // call ContextManager.init() second time to instantiate another clone of TranslatorContext
+    graphSpec.getContextManager().init(samzaConfig, testContext);
+    Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext);
+    // validate the two copies of TranslatorContext are clones of each other
+    validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext());
+  }
+
+  @Test
+  public void testTranslateComplex() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1");
+//    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+//        "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 "
+//            + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    Assert.assertEquals(1, specGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals(1, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("COMPLEX1",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+
+    validatePerTaskContextInit(graphSpec, samzaConfig);
+  }
+
+  @Test
+  public void testTranslateSubQuery() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
+        "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)");
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    Assert.assertEquals(1, specGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals(1, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("COMPLEX1",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+
+    validatePerTaskContextInit(graphSpec, samzaConfig);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithoutJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithFullJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " full join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = IllegalStateException.class)
+  public void testTranslateStreamTableJoinWithSelfJoinOperator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p1.name as profileName"
+            + " from testavro.PROFILE.`$table` as p1"
+            + " join testavro.PROFILE.`$table` as p2"
+            + " on p1.id = p2.id";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithThetaCondition() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id <> pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableCrossJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithAndLiteralCondition() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId and p.name = 'John'";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableJoinWithSubQuery() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " where exists "
+            + " (select p.id from testavro.PROFILE.`$table` as p"
+            + " where p.id = pv.profileId)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateTableTableJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW.`$table` as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamStreamJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateJoinWithIncorrectLeftJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW.`$table` as pv"
+            + " left join testavro.PROFILE as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateJoinWithIncorrectRightJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " right join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableInnerJoinWithMissingStream() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String configIOResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedIOResolverFactory.class.getName());
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateStreamTableInnerJoinWithUdf() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on MyTest(p.id) = MyTest(pv.profileId)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+
+  @Test
+  public void testTranslateStreamTableInnerJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+
+    validatePerTaskContextInit(graphSpec, samzaConfig);
+  }
+
+  @Test
+  public void testTranslateStreamTableLeftJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " left join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic",
+        specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+
+    validatePerTaskContextInit(graphSpec, samzaConfig);
+  }
+
+  @Test
+  public void testTranslateStreamTableRightJoin() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PROFILE.`$table` as p"
+            + " right join testavro.PAGEVIEW as pv"
+            + " on p.id = pv.profileId";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("enrichedPageViewTopic",
+        specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+
+    Assert.assertEquals(3, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
+    Assert.assertEquals("PROFILE",
+        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro",
+        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
+    Assert.assertEquals("PAGEVIEW",
+        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka",
+        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
+    Assert.assertEquals("sql-job-1-partition_by-stream_1",
+        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+
+    validatePerTaskContextInit(graphSpec, samzaConfig);
+  }
+
+  @Test
+  public void testTranslateGroupBy() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.pageViewCountTopic"
+            + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
+            + " from testavro.PAGEVIEW as pv"
+            + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+            + " group by (pv.pageKey)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    Assert.assertEquals(1, specGraph.getInputOperators().size());
+    Assert.assertEquals(1, specGraph.getOutputStreams().size());
+    Assert.assertTrue(specGraph.hasWindowOrJoins());
+    Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs();
+  }
+
+  @Test (expected = SamzaException.class)
+  public void testTranslateGroupByWithSumAggregator() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+    String sql =
+        "Insert into testavro.pageViewCountTopic"
+            + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`"
+            + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+            + " group by (pv.pageKey)";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+    QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+    SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+    StreamGraphSpec
+        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+    translator.translate(queryInfo, graphSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
new file mode 100644
index 0000000..5dd2d21
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java
@@ -0,0 +1,118 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlRelMessageJoinFunction {
+
+  private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4");
+  private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4");
+  private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14");
+  private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5");
+
+  @Test
+  public void testWithInnerJoinWithTableOnRight() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+    List<Integer> tableKeyIds = Arrays.asList(0, 1);
+    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+    expectedFieldNames.addAll(tableFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+    expectedFieldValues.addAll(tableFieldValues);
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+  }
+
+  @Test
+  public void testWithInnerJoinWithTableOnLeft() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 2);
+    List<Integer> tableKeyIds = Arrays.asList(0, 2);
+    SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds);
+    KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
+
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
+    expectedFieldNames.addAll(streamFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
+    expectedFieldValues.addAll(streamFieldValues);
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+  }
+
+  @Test
+  public void testNullRecordWithInnerJoin() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    JoinRelType joinRelType = JoinRelType.INNER;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+    Assert.assertNull(outMsg);
+  }
+
+  @Test
+  public void testNullRecordWithLeftOuterJoin() {
+    SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues);
+    JoinRelType joinRelType = JoinRelType.LEFT;
+    List<Integer> streamKeyIds = Arrays.asList(0, 1);
+
+    SamzaSqlRelMessageJoinFunction joinFn =
+        new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames,
+            tableFieldNames);
+    SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
+
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
+    List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
+    expectedFieldNames.addAll(tableFieldNames);
+    List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
+    expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList()));
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
new file mode 100644
index 0000000..a74993f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java
@@ -0,0 +1,72 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.storage.kv.RocksDbTableProvider;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Base class for all unit tests for translators
+ */
+public class TranslatorTestBase {
+  Map<Integer, MessageStream> registeredStreams = new HashMap<>();
+  Map<String, TableImpl> registeredTables = new HashMap<>();
+
+  Answer getRegisterMessageStreamAnswer() {
+    return (InvocationOnMock x) -> {
+      Integer id = x.getArgumentAt(0, Integer.class);
+      MessageStream stream = x.getArgumentAt(1, MessageStream.class);
+      registeredStreams.put(id, stream);
+      return null;
+    };
+  }
+
+  Answer getRegisteredTableAnswer() {
+    return (InvocationOnMock x) -> {
+      TableDescriptor descriptor = x.getArgumentAt(0, TableDescriptor.class);
+      TableSpec mockTableSpec = new TableSpec(descriptor.getTableId(), KVSerde.of(new StringSerde(),
+          new JsonSerdeV2<SamzaSqlRelMessage>()), RocksDbTableProvider.class.getCanonicalName(), new HashMap<>());
+      TableImpl mockTable = mock(TableImpl.class);
+      when(mockTable.getTableSpec()).thenReturn(mockTableSpec);
+      this.registeredTables.putIfAbsent(descriptor.getTableId(), mockTable);
+      return this.registeredTables.get(descriptor.getTableId());
+    };
+  }
+
+  MessageStream getRegisteredMessageStream(int id) {
+    return this.registeredStreams.get(id);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
new file mode 100644
index 0000000..c029eb4
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.time.Duration;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class AppWithGlobalConfigExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    AppWithGlobalConfigExample app = new AppWithGlobalConfigExample();
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    graph.getInputStream("myPageViewEevent", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)))
+        .map(KV::getValue)
+        .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null)
+            .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+            .setAccumulationMode(AccumulationMode.DISCARDING), "w1")
+        .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
+        .sendTo(graph.getOutputStream("pageViewEventPerMemberStream", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewCount.class))));
+  }
+
+  class PageViewEvent {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+  }
+
+  static class PageViewCount {
+    String memberId;
+    long timestamp;
+    int count;
+
+    PageViewCount(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+  }
+}