You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/01/24 00:51:38 UTC

[1/2] samza git commit: SAMZA-2084: [SAMZA SQL] add logging when exceptions are swallowed and rename util package

Repository: samza
Updated Branches:
  refs/heads/master 4a08a23ed -> 6a1e85eef


http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
deleted file mode 100644
index 618ca50..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.testutil;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
-import org.apache.samza.metrics.Timer;
-
-
-/**
- * TestMetricsRegistryImpl implements the MetricRegistry interface and adds get APIs
- * for testing Translators.
- */
-public class TestMetricsRegistryImpl implements org.apache.samza.metrics.MetricsRegistry {
-  private Map<String, List<Counter>> counters = new HashMap<>();
-  private Map<String, List<Timer>> timers = new HashMap<>();
-  private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
-  private Map<String, List<ListGauge>> listGauges = new HashMap<>();
-
-  @Override
-  public Counter newCounter(String group, String name) {
-    Counter counter = new Counter(name);
-    return newCounter(group, counter);
-  }
-
-  @Override
-  public Counter newCounter(String group, Counter counter) {
-    if (!counters.containsKey(group)) {
-      counters.put(group, new ArrayList<>());
-    }
-    counters.get(group).add(counter);
-    return counter;
-  }
-
-  /**
-   * retrieves the Map of Counters
-   * @return counters
-   */
-  public Map<String, List<Counter>> getCounters() {
-    return counters;
-  }
-
-  @Override
-  public Timer newTimer(String group, String name) {
-    Timer timer = new Timer(name);
-    return newTimer(group, timer);
-  }
-
-  @Override
-  public Timer newTimer(String group, Timer timer) {
-    if (!timers.containsKey(group)) {
-      timers.put(group, new ArrayList<>());
-    }
-    timers.get(group).add(timer);
-    return timer;
-  }
-
-  /**
-   * retrieves the Map of Timers
-   * @return timers
-   */
-  public Map<String, List<Timer>> getTimers() {
-    return timers;
-  }
-
-  @Override
-  public <T> Gauge<T> newGauge(String group, String name, T value) {
-    Gauge<T> gauge = new Gauge<>(name, value);
-    return newGauge(group, gauge);
-  }
-
-  @Override
-  public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
-    if (!gauges.containsKey(group)) {
-      gauges.put(group, new ArrayList<>());
-    }
-    gauges.get(group).add(gauge);
-    return gauge;
-  }
-
-  /**
-   * retrieves the Map of Gauges
-   * @return gauges
-   */
-  public Map<String, List<Gauge<?>>> getGauges() {
-    return gauges;
-  }
-
-  @Override
-  public ListGauge newListGauge(String group, ListGauge listGauge) {
-    listGauges.putIfAbsent(group, new ArrayList());
-    listGauges.get(group).add(listGauge);
-    return listGauge;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
deleted file mode 100644
index dd98b92..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlFileParser {
-
-  public static final String TEST_SQL =
-      "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n"
-          + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, "
-          + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n"
-          + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n"
-          + "\tfrom tracking.SamzaSqlTestTopic1_p8";
-
-  @Test
-  public void testParseSqlFile() throws IOException {
-    File tempFile = File.createTempFile("testparser", "");
-    PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
-    fileWriter.println(TEST_SQL);
-    fileWriter.close();
-
-    List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
-    Assert.assertEquals(3, sqlStmts.size());
-    Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts",
-        sqlStmts.get(0));
-    Assert.assertEquals(
-        "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts",
-        sqlStmts.get(1));
-    Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8",
-        sqlStmts.get(2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
deleted file mode 100644
index be1e317..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-public class TestSamzaSqlQueryParser {
-
-  @Test
-  public void testParseQuery() {
-    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
-    Assert.assertEquals("log.foo", queryInfo.getSink());
-    Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
-    Assert.assertEquals(1, queryInfo.getSources().size());
-    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
-  }
-
-  @Test
-  public void testParseJoinQuery() {
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on p.id = pv.profileId";
-    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
-    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
-    Assert.assertEquals(2, queryInfo.getSources().size());
-    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
-    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
-  }
-
-  @Test
-  public void testParseInvalidQuery() {
-
-    try {
-      SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-
-    try {
-      SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-
-    try {
-      SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
-      Assert.fail("Expected a samzaException");
-    } catch (SamzaException e) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index e804113..037201e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -20,7 +20,6 @@
 package org.apache.samza.sql.translator;
 
 import java.io.IOException;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -41,7 +40,7 @@ import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 233fca4..e8f2d1f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -38,13 +38,11 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 29ce6d0..050971f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -19,7 +19,6 @@
 package org.apache.samza.sql.translator;
 
 import java.io.IOException;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -50,8 +49,7 @@ import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
-import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index c81a1fc..efe3896 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -37,10 +37,10 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
index 4e97ced..2deb6ad 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.translator;
 
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -37,7 +36,7 @@ import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
-import org.apache.samza.sql.testutil.SampleRelTableKeyConverter;
+import org.apache.samza.sql.util.SampleRelTableKeyConverter;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java
new file mode 100644
index 0000000..c71813b
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.util;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.SamzaSqlUdf;
+import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+@SamzaSqlUdf(name = "MyTestArray")
+public class MyTestArrayUdf implements ScalarUdf {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  @SamzaSqlUdfMethod
+  public List<String> execute(Object... args) {
+    Integer value = (Integer) args[0];
+    return IntStream.range(0, value).mapToObj(String::valueOf).collect(Collectors.toList());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
new file mode 100644
index 0000000..6b714b4
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.util;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.SamzaSqlUdf;
+import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
+import org.apache.samza.sql.udfs.ScalarUdf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test UDF used by unit and integration tests.
+ */
+@SamzaSqlUdf(name = "MyTest")
+public class MyTestUdf implements ScalarUdf {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
+
+  @SamzaSqlUdfMethod
+  public Integer execute(Object... value) {
+    return ((Integer) value[0]) * 2;
+  }
+
+  @Override
+  public void init(Config udfConfig) {
+    LOG.info("Init called with {}", udfConfig);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java
new file mode 100644
index 0000000..f5ee75e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java
@@ -0,0 +1,147 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
+
+public class RemoteStoreIOResolverTestFactory implements SqlIOResolverFactory {
+  public static final String TEST_REMOTE_STORE_SYSTEM = "testRemoteStore";
+  public static final String TEST_TABLE_ID = "testTableId";
+
+  public static transient Map<Object, Object> records = new HashMap<>();
+
+  @Override
+  public SqlIOResolver create(Config config, Config fullConfig) {
+    return new TestRemoteStoreIOResolver(config);
+  }
+
+  public static class InMemoryWriteFunction implements TableWriteFunction<Object, Object> {
+
+    @Override
+    public CompletableFuture<Void> putAsync(Object key, Object record) {
+      records.put(key.toString(), record);
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(Object key) {
+      records.remove(key.toString());
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  static class InMemoryReadFunction implements TableReadFunction<Object, Object> {
+
+    @Override
+    public CompletableFuture<Object> getAsync(Object key) {
+      return CompletableFuture.completedFuture(records.get(key.toString()));
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  private class TestRemoteStoreIOResolver implements SqlIOResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+    private final Config config;
+    private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+    private final String changeLogStorePrefix;
+
+    public TestRemoteStoreIOResolver(Config config) {
+      this.config = config;
+      String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+      this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
+    }
+
+    private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
+      String[] sourceComponents = ioName.split("\\.");
+      int systemIdx = 0;
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+      TableDescriptor tableDescriptor = null;
+
+      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        streamIdx = endIdx - 1;
+
+        tableDescriptor = tableDescMap.get(ioName);
+
+        if (tableDescriptor == null) {
+          if (isSink) {
+            tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+                .withReadFunction(new InMemoryReadFunction())
+                .withWriteFunction(new InMemoryWriteFunction());
+          } else if (sourceComponents[systemIdx].equals(TEST_REMOTE_STORE_SYSTEM)) {
+            tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+                .withReadFunction(new InMemoryReadFunction());
+          } else {
+            // A local table
+            String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
+            SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+                (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+            SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+                (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+            tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
+          }
+          tableDescMap.put(ioName, tableDescriptor);
+        }
+      }
+
+      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+      return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+          Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
+    }
+
+    @Override
+    public SqlIOConfig fetchSourceInfo(String sourceName) {
+      return fetchIOInfo(sourceName, false);
+    }
+
+    @Override
+    public SqlIOConfig fetchSinkInfo(String sinkName) {
+      return fetchIOInfo(sinkName, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java
new file mode 100644
index 0000000..5c6b31f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * SampleRelConverter is an {@link AvroRelConverter} which identifies alternate messages as system messages.
+ * This is used purely for testing system messages.
+ */
+public class SampleRelConverterFactory implements SamzaRelConverterFactory {
+
+  private int i = 0;
+
+  @Override
+  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config) {
+    return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) relSchemaProvider, config);
+  }
+
+  public class SampleRelConverter extends AvroRelConverter {
+    public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
+      super(systemStream, schemaProvider, config);
+    }
+
+    @Override
+    public boolean isSystemMessage(KV<Object, Object> kv) {
+      // Return alternate ones as system messages.
+      return (i++) % 2 == 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java
new file mode 100644
index 0000000..1589995
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.stream.Collectors;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverter} used in tests to convert the join key to table format.
+ */
+public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
+
+  @Override
+  public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
+    if (relRecord.getFieldValues().get(0) instanceof SamzaSqlRelRecord) {
+      relRecord = (SamzaSqlRelRecord) relRecord.getFieldValues().get(0);
+    }
+    return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java
new file mode 100644
index 0000000..0234527
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java
@@ -0,0 +1,40 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.HashMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverterFactory} used in tests to create {@link SampleRelTableKeyConverter}.
+ */
+public class SampleRelTableKeyConverterFactory implements SamzaRelTableKeyConverterFactory {
+
+  private final HashMap<SystemStream, SamzaRelTableKeyConverter> relConverters = new HashMap<>();
+
+  @Override
+  public SamzaRelTableKeyConverter create(SystemStream systemStream, Config config) {
+    return relConverters.computeIfAbsent(systemStream, ss -> new SampleRelTableKeyConverter());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
new file mode 100644
index 0000000..19a8638
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -0,0 +1,209 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import com.google.common.base.Joiner;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.sql.avro.AvroRelConverterFactory;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.Company;
+import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.EnrichedPageView;
+import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.PageViewCount;
+import org.apache.samza.sql.avro.schemas.Profile;
+import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.fn.BuildOutputRecordUdf;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.fn.RegexMatchUdf;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+
+import static org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory.TEST_REMOTE_STORE_SYSTEM;
+
+
+/**
+ * Utility to hookup the configs needed to run the Samza Sql application.
+ */
+public class SamzaSqlTestConfig {
+
+  public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+  public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
+  public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
+    return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) {
+    return fetchStaticConfigsWithFactories(props, numberOfMessages, false);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+      boolean includeNullForeignKeys) {
+    return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, false, 0);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+      boolean includeNullForeignKeys, boolean includeNullSimpleRecords) {
+    return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, includeNullSimpleRecords, 0);
+  }
+
+  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+      boolean includeNullForeignKeys, boolean includeNullSimpleRecords, long windowDurationMs) {
+    HashMap<String, String> staticConfigs = new HashMap<>();
+
+    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
+    staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+    String configIOResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        RemoteStoreIOResolverTestFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+    String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedUdfResolver.class.getName());
+    staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
+        .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
+            MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName()));
+
+    String avroSystemConfigPrefix =
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
+    staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+        String.valueOf(numberOfMessages));
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+        includeNullForeignKeys ? "true" : "false");
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_SIMPLE_RECORDS,
+        includeNullSimpleRecords ? "true" : "false");
+    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+        String.valueOf(windowDurationMs / 2));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String testRemoteStoreSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", TEST_REMOTE_STORE_SYSTEM);
+    staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
+    staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String avro2SystemConfigPrefix =
+            String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2);
+    String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2);
+    staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+            String.valueOf(numberOfMessages));
+    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+            includeNullForeignKeys ? "true" : "false");
+    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+            String.valueOf(windowDurationMs / 2));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
+    staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String avroSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroRelConverterFactory.class.getName());
+
+    String testRemoteStoreSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TEST_REMOTE_STORE_SYSTEM);
+    staticConfigs.put(testRemoteStoreSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroRelConverterFactory.class.getName());
+
+    String testRemoteStoreSamzaRelTableKeyConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN, "sample");
+    staticConfigs.put(testRemoteStoreSamzaRelTableKeyConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        SampleRelTableKeyConverterFactory.class.getName());
+
+    String configAvroRelSchemaProviderDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
+    staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedAvroRelSchemaProviderFactory.class.getName());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "SIMPLE3"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "Profile"), ComplexRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "PROFILE"), Profile.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "COMPANY"), Company.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            TEST_REMOTE_STORE_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            TEST_REMOTE_STORE_SYSTEM, "Profile"), Profile.SCHEMA$.toString());
+
+    staticConfigs.putAll(props);
+
+    return staticConfigs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
new file mode 100644
index 0000000..4dd4dd8
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.metrics.Timer;
+
+
+/**
+ * TestMetricsRegistryImpl implements the MetricRegistry interface and adds get APIs
+ * for testing Translators.
+ */
+public class TestMetricsRegistryImpl implements org.apache.samza.metrics.MetricsRegistry {
+  private Map<String, List<Counter>> counters = new HashMap<>();
+  private Map<String, List<Timer>> timers = new HashMap<>();
+  private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
+  private Map<String, List<ListGauge>> listGauges = new HashMap<>();
+
+  @Override
+  public Counter newCounter(String group, String name) {
+    Counter counter = new Counter(name);
+    return newCounter(group, counter);
+  }
+
+  @Override
+  public Counter newCounter(String group, Counter counter) {
+    if (!counters.containsKey(group)) {
+      counters.put(group, new ArrayList<>());
+    }
+    counters.get(group).add(counter);
+    return counter;
+  }
+
+  /**
+   * retrieves the Map of Counters
+   * @return counters
+   */
+  public Map<String, List<Counter>> getCounters() {
+    return counters;
+  }
+
+  @Override
+  public Timer newTimer(String group, String name) {
+    Timer timer = new Timer(name);
+    return newTimer(group, timer);
+  }
+
+  @Override
+  public Timer newTimer(String group, Timer timer) {
+    if (!timers.containsKey(group)) {
+      timers.put(group, new ArrayList<>());
+    }
+    timers.get(group).add(timer);
+    return timer;
+  }
+
+  /**
+   * retrieves the Map of Timers
+   * @return timers
+   */
+  public Map<String, List<Timer>> getTimers() {
+    return timers;
+  }
+
+  @Override
+  public <T> Gauge<T> newGauge(String group, String name, T value) {
+    Gauge<T> gauge = new Gauge<>(name, value);
+    return newGauge(group, gauge);
+  }
+
+  @Override
+  public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
+    if (!gauges.containsKey(group)) {
+      gauges.put(group, new ArrayList<>());
+    }
+    gauges.get(group).add(gauge);
+    return gauge;
+  }
+
+  /**
+   * retrieves the Map of Gauges
+   * @return gauges
+   */
+  public Map<String, List<Gauge<?>>> getGauges() {
+    return gauges;
+  }
+
+  @Override
+  public ListGauge newListGauge(String group, ListGauge listGauge) {
+    listGauges.putIfAbsent(group, new ArrayList());
+    listGauges.get(group).add(listGauge);
+    return listGauge;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java
new file mode 100644
index 0000000..d5372ea
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlFileParser {
+
+  public static final String TEST_SQL =
+      "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n"
+          + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, "
+          + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n"
+          + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n"
+          + "\tfrom tracking.SamzaSqlTestTopic1_p8";
+
+  @Test
+  public void testParseSqlFile() throws IOException {
+    File tempFile = File.createTempFile("testparser", "");
+    PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
+    fileWriter.println(TEST_SQL);
+    fileWriter.close();
+
+    List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
+    Assert.assertEquals(3, sqlStmts.size());
+    Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts",
+        sqlStmts.get(0));
+    Assert.assertEquals(
+        "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts",
+        sqlStmts.get(1));
+    Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8",
+        sqlStmts.get(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
new file mode 100644
index 0000000..f15c2f6
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestSamzaSqlQueryParser {
+
+  @Test
+  public void testParseQuery() {
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
+    Assert.assertEquals("log.foo", queryInfo.getSink());
+    Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
+    Assert.assertEquals(1, queryInfo.getSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+  }
+
+  @Test
+  public void testParseJoinQuery() {
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from testavro.PAGEVIEW as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+    Assert.assertEquals(2, queryInfo.getSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
+  }
+
+  @Test
+  public void testParseInvalidQuery() {
+
+    try {
+      SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+
+    try {
+      SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
+      Assert.fail("Expected a samzaException");
+    } catch (SamzaException e) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 259bfbf..76119e4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -36,10 +36,10 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.serializers.JsonSerdeV2Factory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.MyTestUdf;
-import org.apache.samza.sql.testutil.SampleRelConverterFactory;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.MyTestUdf;
+import org.apache.samza.sql.util.SampleRelConverterFactory;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index c075677..2c9556f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -28,9 +28,9 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index bc44470..ab60156 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -43,8 +43,8 @@ import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SqlFileParser;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SqlFileParser;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.system.kafka.KafkaSystemFactory;
 import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
index 9392aab..5cbe0cc 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
@@ -33,7 +33,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.util.ReflectionUtils;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.tools.CommandLineHelper;
 


[2/2] samza git commit: SAMZA-2084: [SAMZA SQL] add logging when exceptions are swallowed and rename util package

Posted by sr...@apache.org.
SAMZA-2084: [SAMZA SQL] add logging when exceptions are swallowed and rename util package

## What changes were proposed in this pull request?
This PR is to resolve:
1) Samza Sql Shell: add logging when exceptions are swallowed
2) Samza Sql: rename the package name of "[testutil](https://github.com/apache/samza/tree/master/samza-sql/src/main/java/org/apache/samza/sql/testutil)" to "util".

## How was this patch tested?
Pass the current unit tests.

Author: Weiqing Yang <ya...@gmail.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>

Closes #891 from weiqingy/SAMZA-2084


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a1e85ee
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a1e85ee
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a1e85ee

Branch: refs/heads/master
Commit: 6a1e85eef298e3587b277bf5745d0b1d9962e4ec
Parents: 4a08a23
Author: Weiqing Yang <ya...@gmail.com>
Authored: Wed Jan 23 16:51:33 2019 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Wed Jan 23 16:51:33 2019 -0800

----------------------------------------------------------------------
 .../samza/sql/client/impl/SamzaExecutor.java    |  42 ++--
 .../sql/data/SamzaSqlExecutionContext.java      |   2 +-
 .../samza/sql/dsl/SamzaSqlDslConverter.java     |   4 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   |   6 +-
 .../apache/samza/sql/testutil/ConfigUtil.java   |  62 ------
 .../org/apache/samza/sql/testutil/JsonUtil.java |  91 --------
 .../samza/sql/testutil/ReflectionUtils.java     |  62 ------
 .../samza/sql/testutil/SamzaSqlQueryParser.java | 196 -----------------
 .../samza/sql/testutil/SqlFileParser.java       | 103 ---------
 .../samza/sql/translator/QueryTranslator.java   |   2 +-
 .../org/apache/samza/sql/util/ConfigUtil.java   |  62 ++++++
 .../org/apache/samza/sql/util/JsonUtil.java     |  91 ++++++++
 .../apache/samza/sql/util/ReflectionUtils.java  |  62 ++++++
 .../samza/sql/util/SamzaSqlQueryParser.java     | 196 +++++++++++++++++
 .../apache/samza/sql/util/SqlFileParser.java    | 103 +++++++++
 .../runner/TestSamzaSqlApplicationConfig.java   |   6 +-
 .../runner/TestSamzaSqlApplicationRunner.java   |   2 +-
 .../samza/sql/testutil/MyTestArrayUdf.java      |  42 ----
 .../apache/samza/sql/testutil/MyTestUdf.java    |  50 -----
 .../RemoteStoreIOResolverTestFactory.java       | 147 -------------
 .../sql/testutil/SampleRelConverterFactory.java |  56 -----
 .../testutil/SampleRelTableKeyConverter.java    |  39 ----
 .../SampleRelTableKeyConverterFactory.java      |  41 ----
 .../samza/sql/testutil/SamzaSqlTestConfig.java  | 209 -------------------
 .../sql/testutil/TestMetricsRegistryImpl.java   | 117 -----------
 .../sql/testutil/TestSamzaSqlFileParser.java    |  56 -----
 .../sql/testutil/TestSamzaSqlQueryParser.java   |  75 -------
 .../sql/translator/TestFilterTranslator.java    |   3 +-
 .../sql/translator/TestJoinTranslator.java      |   4 +-
 .../sql/translator/TestProjectTranslator.java   |   4 +-
 .../sql/translator/TestQueryTranslator.java     |   8 +-
 .../TestSamzaSqlRemoteTableJoinFunction.java    |   3 +-
 .../apache/samza/sql/util/MyTestArrayUdf.java   |  42 ++++
 .../org/apache/samza/sql/util/MyTestUdf.java    |  49 +++++
 .../util/RemoteStoreIOResolverTestFactory.java  | 147 +++++++++++++
 .../sql/util/SampleRelConverterFactory.java     |  56 +++++
 .../sql/util/SampleRelTableKeyConverter.java    |  39 ++++
 .../util/SampleRelTableKeyConverterFactory.java |  40 ++++
 .../samza/sql/util/SamzaSqlTestConfig.java      | 209 +++++++++++++++++++
 .../samza/sql/util/TestMetricsRegistryImpl.java | 117 +++++++++++
 .../samza/sql/util/TestSamzaSqlFileParser.java  |  56 +++++
 .../samza/sql/util/TestSamzaSqlQueryParser.java |  75 +++++++
 .../test/samzasql/TestSamzaSqlEndToEnd.java     |   8 +-
 .../test/samzasql/TestSamzaSqlRemoteTable.java  |   6 +-
 .../org/apache/samza/tools/SamzaSqlConsole.java |   4 +-
 .../tools/benchmark/AbstractSamzaBench.java     |   2 +-
 46 files changed, 1398 insertions(+), 1398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 1149364..7df79c9 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -31,7 +31,6 @@ import org.apache.samza.config.*;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.serializers.StringSerdeFactory;
-import org.apache.samza.sql.avro.AvroRelSchemaProvider;
 import org.apache.samza.sql.client.interfaces.*;
 import org.apache.samza.sql.client.util.RandomAccessQueue;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
@@ -49,7 +48,7 @@ import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.schema.SamzaSqlFieldType;
 import org.apache.samza.sql.schema.SqlFieldSchema;
 import org.apache.samza.sql.schema.SqlSchema;
-import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.util.JsonUtil;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.kafka.KafkaSystemFactory;
@@ -126,8 +125,9 @@ public class SamzaExecutor implements SqlExecutor {
         .map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
         .collect(Collectors.toList());
     } catch (ZkTimeoutException ex) {
-      lastErrorMsg = ex.toString();
-      LOG.error(lastErrorMsg);
+      String msg = "listTables failed with exception ";
+      lastErrorMsg = msg + ex.toString();
+      LOG.error(msg, ex);
     }
     return tables;
   }
@@ -151,8 +151,9 @@ public class SamzaExecutor implements SqlExecutor {
                       (o, c) -> ((RelSchemaProviderFactory) o).create(sourceInfo.getSystemStream(), c));
       sqlSchema =  schemaProvider.getSqlSchema();
     } catch (SamzaException ex) {
-      lastErrorMsg = ex.toString();
-      LOG.error(lastErrorMsg);
+      String msg = "getTableSchema failed with exception ";
+      lastErrorMsg = msg + ex.toString();
+      LOG.error(msg, ex);
     }
     return sqlSchema;
   }
@@ -172,8 +173,9 @@ public class SamzaExecutor implements SqlExecutor {
       runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
       runner.run(null);
     } catch (SamzaException ex) {
-      lastErrorMsg = ex.toString();
-      LOG.error(lastErrorMsg);
+      String msg = "Execution failed with exception ";
+      lastErrorMsg = msg + ex.toString();
+      LOG.error(msg, ex);
       return new QueryResult(execId, null, false);
     }
     executions.put(execId, runner);
@@ -214,8 +216,9 @@ public class SamzaExecutor implements SqlExecutor {
     try {
       executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList());
     } catch (IOException e) {
-      lastErrorMsg = String.format("Unable to parse the sql file %s. %s", sqlFile.getPath(), e.toString());
-      LOG.error(lastErrorMsg);
+      String msg = "Unable to parse the sql file " + sqlFile.getAbsolutePath();
+      lastErrorMsg = msg + e.toString();
+      LOG.error(msg, e);
       return new NonQueryResult(-1, false);
     }
     LOG.info("Sql statements in Sql file: " + executedStmts.toString());
@@ -245,8 +248,9 @@ public class SamzaExecutor implements SqlExecutor {
       runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
       runner.run(null);
     } catch (SamzaException ex) {
-      lastErrorMsg = ex.toString();
-      LOG.error(lastErrorMsg);
+      String msg = "Execution of the query failed with exception ";
+      lastErrorMsg = msg + ex.toString();
+      LOG.error(msg, ex);
       return new NonQueryResult(execId, false);
     }
     executions.put(execId, runner);
@@ -265,9 +269,10 @@ public class SamzaExecutor implements SqlExecutor {
 
       try {
         runner.kill();
-      } catch (SamzaException ex) {
-        lastErrorMsg = ex.toString();
-        LOG.debug(lastErrorMsg);
+        } catch (SamzaException ex) {
+        String msg = "Stopping execution failed with exception ";
+        lastErrorMsg = msg + ex.toString();
+        LOG.warn(msg, ex);
         return false;
       }
 
@@ -489,15 +494,18 @@ public class SamzaExecutor implements SqlExecutor {
   }
 
   private String getPrettyFormat(OutgoingMessageEnvelope envelope) {
+    lastErrorMsg = "";
     String value = new String((byte[]) envelope.getMessage());
     ObjectMapper mapper = new ObjectMapper();
     String formattedValue;
     try {
       Object json = mapper.readValue(value, Object.class);
       formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
-    } catch (IOException e) {
+    } catch (IOException ex) {
       formattedValue = value;
-      LOG.error("Error while formatting json", e);
+      String msg = "getPrettyFormat failed with exception while formatting json ";
+      lastErrorMsg = msg + ex.toString();
+      LOG.error(msg, ex);
     }
     return formattedValue;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
index cb5c7a7..091ca62 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -28,7 +28,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.util.ReflectionUtils;
 import org.apache.samza.sql.udfs.ScalarUdf;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index d4cb134..ea0ebfa 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -32,8 +32,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.sql.interfaces.DslConverter;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SqlFileParser;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SqlFileParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index b2a5efe..4883dfb 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -55,9 +55,9 @@ import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.interfaces.UdfResolver;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.ReflectionUtils;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.ReflectionUtils;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
deleted file mode 100644
index 8bbab86..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-
-
-/**
- * Utility methods to aid with config management.
- */
-public class ConfigUtil {
-
-  private ConfigUtil() {
-  }
-
-  /**
-   * Method is used to filter just the properties with the prefix.
-   * @param props Full set of properties
-   * @param prefix Prefix of the keys that in the config that needs to be filtered out.
-   * @param preserveFullKey If set to true, after filtering, preserves the full key including the prefix.
-   *                        If set to false, Strips out the prefix from the key before returning.
-   * @return Returns the filtered set of properties matching the prefix from the input property bag.
-   */
-  public static Properties getDomainProperties(Properties props, String prefix, boolean preserveFullKey) {
-    String fullPrefix;
-    if (StringUtils.isBlank(prefix)) {
-      fullPrefix = ""; // this will effectively retrieve all properties
-    } else {
-      fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
-    }
-    Properties ret = new Properties();
-    props.keySet().stream().map(String.class::cast).forEach(keyStr -> {
-      if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
-        if (preserveFullKey) {
-          ret.put(keyStr, props.get(keyStr));
-        } else {
-          ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
-        }
-      }
-    });
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
deleted file mode 100644
index ea78f41..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.io.IOException;
-import java.io.StringWriter;
-
-import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility methods to aid serialization and deserialization of Json.
- */
-public class JsonUtil {
-
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  private static final Logger LOG = LoggerFactory.getLogger(JsonUtil.class.getName());
-
-  static {
-    MAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-  }
-
-  private JsonUtil() {
-  }
-
-  /**
-   * Deserialize a JSON string into an object based on a type reference.
-   * This method allows the caller to specify precisely the desired output
-   * type for the target object.
-   * @param json JSON string
-   * @param typeRef type reference of the target object
-   * @param <T> type of the target object
-   * @return deserialized Java object
-   */
-  public static <T> T fromJson(String json, TypeReference<T> typeRef) {
-    Validate.notNull(json, "null JSON string");
-    Validate.notNull(typeRef, "null type reference");
-    T object;
-    try {
-      object = MAPPER.readValue(json, typeRef);
-    } catch (IOException e) {
-      String errorMessage = "Failed to parse json: " + json;
-      LOG.error(errorMessage, e);
-      throw new SamzaException(errorMessage, e);
-    }
-    return object;
-  }
-
-  /**
-   * Serialize a Java object into JSON string.
-   * @param object object to be serialized
-   * @param <T> type of the input object
-   * @return JSON string
-   */
-  public static <T> String toJson(T object) {
-    Validate.notNull(object, "null input object");
-    StringWriter out = new StringWriter();
-    try {
-      MAPPER.writeValue(out, object);
-    } catch (IOException e) {
-      String errorMessage = "Failed to serialize object: " + object;
-      LOG.error(errorMessage, e);
-      throw new SamzaException(errorMessage, e);
-    }
-    return out.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
deleted file mode 100644
index 4293c76..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.lang.reflect.Constructor;
-import java.util.stream.IntStream;
-
-import org.apache.commons.lang.Validate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class to simplify usage of Java reflection.
- */
-public class ReflectionUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
-
-  private ReflectionUtils() {
-
-  }
-
-  /**
-   * Create an instance of the specified class with constuctor
-   * matching the argument array.
-   * @param clazz name of the class
-   * @param args argument array
-   * @param <T> type fo the class
-   * @return instance of the class, or null if anything went wrong
-   */
-  @SuppressWarnings("unchecked")
-  public static <T> T createInstance(String clazz, Object... args) {
-    Validate.notNull(clazz, "null class name");
-    try {
-      Class<T> classObj = (Class<T>) Class.forName(clazz);
-      Class<?>[] argTypes = new Class<?>[args.length];
-      IntStream.range(0, args.length).forEach(i -> argTypes[i] = args[i].getClass());
-      Constructor<T> ctor = classObj.getDeclaredConstructor(argTypes);
-      return ctor.newInstance(args);
-    } catch (Exception e) {
-      LOG.warn("Failed to create instance for: " + clazz, e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
deleted file mode 100644
index 643c82f..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlAsOperator;
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlJoin;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.SqlUnnestOperator;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.samza.SamzaException;
-import org.apache.samza.sql.interfaces.SamzaSqlDriver;
-import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
-
-
-/**
- * Utility class that is used to parse the Samza sql query to figure out the sources, sink etc..
- */
-public class SamzaSqlQueryParser {
-
-  private SamzaSqlQueryParser() {
-  }
-
-  public static class QueryInfo {
-    private final List<String> sources;
-    private String selectQuery;
-    private String sink;
-    private String sql;
-
-    public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) {
-      this.selectQuery = selectQuery;
-      this.sink = sink;
-      this.sources = sources;
-      this.sql = sql;
-    }
-
-    public List<String> getSources() {
-      return sources;
-    }
-
-    public String getSelectQuery() {
-      return selectQuery;
-    }
-
-    public String getSink() {
-      return sink;
-    }
-
-    public String getSql() {
-      return sql;
-    }
-  }
-
-  public static QueryInfo parseQuery(String sql) {
-
-    Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select .* from (.*))", Pattern.CASE_INSENSITIVE);
-    Matcher m = insertIntoSqlPattern.matcher(sql);
-    if (!m.matches()) {
-      throw new SamzaException("Invalid query format");
-    }
-
-    Planner planner = createPlanner();
-    SqlNode sqlNode;
-    try {
-      sqlNode = planner.parse(sql);
-    } catch (SqlParseException e) {
-      throw new SamzaException(e);
-    }
-
-    String sink;
-    String selectQuery;
-    ArrayList<String> sources;
-    if (sqlNode instanceof SqlInsert) {
-      SqlInsert sqlInsert = ((SqlInsert) sqlNode);
-      sink = sqlInsert.getTargetTable().toString();
-      if (sqlInsert.getSource() instanceof SqlSelect) {
-        SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
-        selectQuery = m.group(2);
-        sources = getSourcesFromSelectQuery(sqlSelect);
-      } else {
-        throw new SamzaException("Sql query is not of the expected format");
-      }
-    } else {
-      throw new SamzaException("Sql query is not of the expected format");
-    }
-
-    return new QueryInfo(selectQuery, sources, sink, sql);
-  }
-
-  private static Planner createPlanner() {
-    Connection connection;
-    SchemaPlus rootSchema;
-    try {
-      JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl();
-      SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory);
-      DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:"));
-      DriverManager.registerDriver(driver);
-      connection = driver.connect("jdbc:calcite:", new Properties());
-      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-      rootSchema = calciteConnection.getRootSchema();
-    } catch (SQLException e) {
-      throw new SamzaException(e);
-    }
-
-    final List<RelTraitDef> traitDefs = new ArrayList<>();
-
-    traitDefs.add(ConventionTraitDef.INSTANCE);
-    traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-        .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
-        .defaultSchema(rootSchema)
-        .operatorTable(SqlStdOperatorTable.instance())
-        .traitDefs(traitDefs)
-        .context(Contexts.EMPTY_CONTEXT)
-        .costFactory(null)
-        .build();
-    return Frameworks.getPlanner(frameworkConfig);
-  }
-
-  private static ArrayList<String> getSourcesFromSelectQuery(SqlSelect sqlSelect) {
-    ArrayList<String> sources = new ArrayList<>();
-    getSource(sqlSelect.getFrom(), sources);
-    if (sources.size() < 1) {
-      throw new SamzaException("Unsupported query " + sqlSelect);
-    }
-
-    return sources;
-  }
-
-  private static void getSource(SqlNode node, ArrayList<String> sourceList) {
-    if (node instanceof SqlJoin) {
-      SqlJoin joinNode = (SqlJoin) node;
-      ArrayList<String> sourcesLeft = new ArrayList<>();
-      ArrayList<String> sourcesRight = new ArrayList<>();
-      getSource(joinNode.getLeft(), sourcesLeft);
-      getSource(joinNode.getRight(), sourcesRight);
-
-      sourceList.addAll(sourcesLeft);
-      sourceList.addAll(sourcesRight);
-    } else if (node instanceof SqlIdentifier) {
-      sourceList.add(node.toString());
-    } else if (node instanceof SqlBasicCall) {
-      SqlBasicCall basicCall = ((SqlBasicCall) node);
-      if (basicCall.getOperator() instanceof SqlAsOperator) {
-        getSource(basicCall.operand(0), sourceList);
-      } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
-        sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
-      }
-    } else if (node instanceof SqlSelect) {
-      getSource(((SqlSelect) node).getFrom(), sourceList);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
deleted file mode 100644
index ca355ad..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility to read the .sql file and parse out the various sql statements in the file.
- * Right now Samza SQL
- *     Samza SQL supports a sql file with multiple SQL statements where each SQL statement can be spread across
- *       multiple lines.
- *     It supports sql comments where a line starts with "--".
- *     It cannot support multiple sql statements in a single line.
- *     All the empty lines are ignored
- *     All the SQL statements should start with "insert into".
- *
- * e.g. SQL File
- * -- Sample comment
- * insert into log.output1 select * from kafka.input1
- *
- * insert into log.output2
- *   select * from kafka.input2
- *
- * -- You may have empty lines in between a single query.
- * insert into log.output3
- *
- *   select * from kafka.input3
- *
- * -- Below line which contains multiple sql statements are not supported
- * -- insert into log.output4 select * from kafka.input4 insert into log.output5 select * from kafka.input5
- *
- * -- Below SQL statement is not supported because it doesn't start with insert into
- * -- select * from kafka.input6
- */
-public class SqlFileParser {
-
-  private static final String INSERT_CMD = "insert";
-  private static final Logger LOG = LoggerFactory.getLogger(SqlFileParser.class);
-  private static final String SQL_COMMENT_PREFIX = "--";
-
-  private SqlFileParser() {
-  }
-
-  public static List<String> parseSqlFile(String fileName) {
-    Validate.notEmpty(fileName, "fileName cannot be empty.");
-    List<String> sqlLines;
-    try {
-      sqlLines = Files.lines(Paths.get(fileName)).collect(Collectors.toList());
-    } catch (IOException e) {
-      String msg = String.format("Unable to parse the sql file %s", fileName);
-      LOG.error(msg, e);
-      throw new SamzaException(msg, e);
-    }
-    List<String> sqlStmts = new ArrayList<>();
-    String lastStatement = "";
-    for (String sqlLine : sqlLines) {
-      String sql = sqlLine.trim();
-      if (sql.toLowerCase().startsWith(INSERT_CMD)) {
-        if (StringUtils.isNotEmpty(lastStatement)) {
-          sqlStmts.add(lastStatement);
-        }
-
-        lastStatement = sql;
-      } else if (StringUtils.isNotBlank(sql) && !sql.startsWith(SQL_COMMENT_PREFIX)) {
-        lastStatement = String.format("%s %s", lastStatement, sql);
-      }
-    }
-
-    if (!StringUtils.isWhitespace(lastStatement)) {
-      sqlStmts.add(lastStatement);
-    }
-    return sqlStmts;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index c8d1edf..0fdfd39 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -61,7 +61,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.system.descriptors.GenericOutputDescriptor;
 import org.apache.samza.table.Table;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java
new file mode 100644
index 0000000..e87ea2f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+
+/**
+ * Utility methods to aid with config management.
+ */
+public class ConfigUtil {
+
+  private ConfigUtil() {
+  }
+
+  /**
+   * Method is used to filter just the properties with the prefix.
+   * @param props Full set of properties
+   * @param prefix Prefix of the keys that in the config that needs to be filtered out.
+   * @param preserveFullKey If set to true, after filtering, preserves the full key including the prefix.
+   *                        If set to false, Strips out the prefix from the key before returning.
+   * @return Returns the filtered set of properties matching the prefix from the input property bag.
+   */
+  public static Properties getDomainProperties(Properties props, String prefix, boolean preserveFullKey) {
+    String fullPrefix;
+    if (StringUtils.isBlank(prefix)) {
+      fullPrefix = ""; // this will effectively retrieve all properties
+    } else {
+      fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
+    }
+    Properties ret = new Properties();
+    props.keySet().stream().map(String.class::cast).forEach(keyStr -> {
+      if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
+        if (preserveFullKey) {
+          ret.put(keyStr, props.get(keyStr));
+        } else {
+          ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
+        }
+      }
+    });
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
new file mode 100644
index 0000000..afd6490
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
@@ -0,0 +1,91 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility methods to aid serialization and deserialization of Json.
+ */
+public class JsonUtil {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private static final Logger LOG = LoggerFactory.getLogger(JsonUtil.class.getName());
+
+  static {
+    MAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private JsonUtil() {
+  }
+
+  /**
+   * Deserialize a JSON string into an object based on a type reference.
+   * This method allows the caller to specify precisely the desired output
+   * type for the target object.
+   * @param json JSON string
+   * @param typeRef type reference of the target object
+   * @param <T> type of the target object
+   * @return deserialized Java object
+   */
+  public static <T> T fromJson(String json, TypeReference<T> typeRef) {
+    Validate.notNull(json, "null JSON string");
+    Validate.notNull(typeRef, "null type reference");
+    T object;
+    try {
+      object = MAPPER.readValue(json, typeRef);
+    } catch (IOException e) {
+      String errorMessage = "Failed to parse json: " + json;
+      LOG.error(errorMessage, e);
+      throw new SamzaException(errorMessage, e);
+    }
+    return object;
+  }
+
+  /**
+   * Serialize a Java object into JSON string.
+   * @param object object to be serialized
+   * @param <T> type of the input object
+   * @return JSON string
+   */
+  public static <T> String toJson(T object) {
+    Validate.notNull(object, "null input object");
+    StringWriter out = new StringWriter();
+    try {
+      MAPPER.writeValue(out, object);
+    } catch (IOException e) {
+      String errorMessage = "Failed to serialize object: " + object;
+      LOG.error(errorMessage, e);
+      throw new SamzaException(errorMessage, e);
+    }
+    return out.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java b/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java
new file mode 100644
index 0000000..2ef626c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.lang.reflect.Constructor;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to simplify usage of Java reflection.
+ */
+public class ReflectionUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
+
+  private ReflectionUtils() {
+
+  }
+
+  /**
+   * Create an instance of the specified class with constuctor
+   * matching the argument array.
+   * @param clazz name of the class
+   * @param args argument array
+   * @param <T> type fo the class
+   * @return instance of the class, or null if anything went wrong
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T createInstance(String clazz, Object... args) {
+    Validate.notNull(clazz, "null class name");
+    try {
+      Class<T> classObj = (Class<T>) Class.forName(clazz);
+      Class<?>[] argTypes = new Class<?>[args.length];
+      IntStream.range(0, args.length).forEach(i -> argTypes[i] = args[i].getClass());
+      Constructor<T> ctor = classObj.getDeclaredConstructor(argTypes);
+      return ctor.newInstance(args);
+    } catch (Exception e) {
+      LOG.warn("Failed to create instance for: " + clazz, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
new file mode 100644
index 0000000..e81abfa
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -0,0 +1,196 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlAsOperator;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlUnnestOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.interfaces.SamzaSqlDriver;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+
+
+/**
+ * Utility class that is used to parse the Samza sql query to figure out the sources, sink etc..
+ */
+public class SamzaSqlQueryParser {
+
+  private SamzaSqlQueryParser() {
+  }
+
+  public static class QueryInfo {
+    private final List<String> sources;
+    private String selectQuery;
+    private String sink;
+    private String sql;
+
+    public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) {
+      this.selectQuery = selectQuery;
+      this.sink = sink;
+      this.sources = sources;
+      this.sql = sql;
+    }
+
+    public List<String> getSources() {
+      return sources;
+    }
+
+    public String getSelectQuery() {
+      return selectQuery;
+    }
+
+    public String getSink() {
+      return sink;
+    }
+
+    public String getSql() {
+      return sql;
+    }
+  }
+
+  public static QueryInfo parseQuery(String sql) {
+
+    Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select .* from (.*))", Pattern.CASE_INSENSITIVE);
+    Matcher m = insertIntoSqlPattern.matcher(sql);
+    if (!m.matches()) {
+      throw new SamzaException("Invalid query format");
+    }
+
+    Planner planner = createPlanner();
+    SqlNode sqlNode;
+    try {
+      sqlNode = planner.parse(sql);
+    } catch (SqlParseException e) {
+      throw new SamzaException(e);
+    }
+
+    String sink;
+    String selectQuery;
+    ArrayList<String> sources;
+    if (sqlNode instanceof SqlInsert) {
+      SqlInsert sqlInsert = ((SqlInsert) sqlNode);
+      sink = sqlInsert.getTargetTable().toString();
+      if (sqlInsert.getSource() instanceof SqlSelect) {
+        SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
+        selectQuery = m.group(2);
+        sources = getSourcesFromSelectQuery(sqlSelect);
+      } else {
+        throw new SamzaException("Sql query is not of the expected format");
+      }
+    } else {
+      throw new SamzaException("Sql query is not of the expected format");
+    }
+
+    return new QueryInfo(selectQuery, sources, sink, sql);
+  }
+
+  private static Planner createPlanner() {
+    Connection connection;
+    SchemaPlus rootSchema;
+    try {
+      JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl();
+      SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory);
+      DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:"));
+      DriverManager.registerDriver(driver);
+      connection = driver.connect("jdbc:calcite:", new Properties());
+      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+      rootSchema = calciteConnection.getRootSchema();
+    } catch (SQLException e) {
+      throw new SamzaException(e);
+    }
+
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
+        .defaultSchema(rootSchema)
+        .operatorTable(SqlStdOperatorTable.instance())
+        .traitDefs(traitDefs)
+        .context(Contexts.EMPTY_CONTEXT)
+        .costFactory(null)
+        .build();
+    return Frameworks.getPlanner(frameworkConfig);
+  }
+
+  private static ArrayList<String> getSourcesFromSelectQuery(SqlSelect sqlSelect) {
+    ArrayList<String> sources = new ArrayList<>();
+    getSource(sqlSelect.getFrom(), sources);
+    if (sources.size() < 1) {
+      throw new SamzaException("Unsupported query " + sqlSelect);
+    }
+
+    return sources;
+  }
+
+  private static void getSource(SqlNode node, ArrayList<String> sourceList) {
+    if (node instanceof SqlJoin) {
+      SqlJoin joinNode = (SqlJoin) node;
+      ArrayList<String> sourcesLeft = new ArrayList<>();
+      ArrayList<String> sourcesRight = new ArrayList<>();
+      getSource(joinNode.getLeft(), sourcesLeft);
+      getSource(joinNode.getRight(), sourcesRight);
+
+      sourceList.addAll(sourcesLeft);
+      sourceList.addAll(sourcesRight);
+    } else if (node instanceof SqlIdentifier) {
+      sourceList.add(node.toString());
+    } else if (node instanceof SqlBasicCall) {
+      SqlBasicCall basicCall = ((SqlBasicCall) node);
+      if (basicCall.getOperator() instanceof SqlAsOperator) {
+        getSource(basicCall.operand(0), sourceList);
+      } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
+        sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
+      }
+    } else if (node instanceof SqlSelect) {
+      getSource(((SqlSelect) node).getFrom(), sourceList);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
new file mode 100644
index 0000000..f996987
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
@@ -0,0 +1,103 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility to read the .sql file and parse out the various sql statements in the file.
+ * Right now Samza SQL
+ *     Samza SQL supports a sql file with multiple SQL statements where each SQL statement can be spread across
+ *       multiple lines.
+ *     It supports sql comments where a line starts with "--".
+ *     It cannot support multiple sql statements in a single line.
+ *     All the empty lines are ignored
+ *     All the SQL statements should start with "insert into".
+ *
+ * e.g. SQL File
+ * -- Sample comment
+ * insert into log.output1 select * from kafka.input1
+ *
+ * insert into log.output2
+ *   select * from kafka.input2
+ *
+ * -- You may have empty lines in between a single query.
+ * insert into log.output3
+ *
+ *   select * from kafka.input3
+ *
+ * -- Below line which contains multiple sql statements are not supported
+ * -- insert into log.output4 select * from kafka.input4 insert into log.output5 select * from kafka.input5
+ *
+ * -- Below SQL statement is not supported because it doesn't start with insert into
+ * -- select * from kafka.input6
+ */
+public class SqlFileParser {
+
+  private static final String INSERT_CMD = "insert";
+  private static final Logger LOG = LoggerFactory.getLogger(SqlFileParser.class);
+  private static final String SQL_COMMENT_PREFIX = "--";
+
+  private SqlFileParser() {
+  }
+
+  public static List<String> parseSqlFile(String fileName) {
+    Validate.notEmpty(fileName, "fileName cannot be empty.");
+    List<String> sqlLines;
+    try {
+      sqlLines = Files.lines(Paths.get(fileName)).collect(Collectors.toList());
+    } catch (IOException e) {
+      String msg = String.format("Unable to parse the sql file %s", fileName);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    }
+    List<String> sqlStmts = new ArrayList<>();
+    String lastStatement = "";
+    for (String sqlLine : sqlLines) {
+      String sql = sqlLine.trim();
+      if (sql.toLowerCase().startsWith(INSERT_CMD)) {
+        if (StringUtils.isNotEmpty(lastStatement)) {
+          sqlStmts.add(lastStatement);
+        }
+
+        lastStatement = sql;
+      } else if (StringUtils.isNotBlank(sql) && !sql.startsWith(SQL_COMMENT_PREFIX)) {
+        lastStatement = String.format("%s %s", lastStatement, sql);
+      }
+    }
+
+    if (!StringUtils.isWhitespace(lastStatement)) {
+      sqlStmts.add(lastStatement);
+    }
+    return sqlStmts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index 294eccd..8d2c588 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -30,9 +30,9 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index ccab449..272ca4a 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -24,7 +24,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
deleted file mode 100644
index 018a733..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.testutil;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.udfs.SamzaSqlUdf;
-import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
-import org.apache.samza.sql.udfs.ScalarUdf;
-
-
-@SamzaSqlUdf(name = "MyTestArray")
-public class MyTestArrayUdf implements ScalarUdf {
-  @Override
-  public void init(Config udfConfig) {
-  }
-
-  @SamzaSqlUdfMethod
-  public List<String> execute(Object... args) {
-    Integer value = (Integer) args[0];
-    return IntStream.range(0, value).mapToObj(String::valueOf).collect(Collectors.toList());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
deleted file mode 100644
index 4241424..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.testutil;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.schema.SamzaSqlFieldType;
-import org.apache.samza.sql.udfs.SamzaSqlUdf;
-import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
-import org.apache.samza.sql.udfs.ScalarUdf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Test UDF used by unit and integration tests.
- */
-@SamzaSqlUdf(name = "MyTest")
-public class MyTestUdf implements ScalarUdf {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
-
-  @SamzaSqlUdfMethod
-  public Integer execute(Object... value) {
-    return ((Integer) value[0]) * 2;
-  }
-
-  @Override
-  public void init(Config udfConfig) {
-    LOG.info("Init called with {}", udfConfig);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
deleted file mode 100644
index 6ee9ee8..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
-import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
-
-
-public class RemoteStoreIOResolverTestFactory implements SqlIOResolverFactory {
-  public static final String TEST_REMOTE_STORE_SYSTEM = "testRemoteStore";
-  public static final String TEST_TABLE_ID = "testTableId";
-
-  public static transient Map<Object, Object> records = new HashMap<>();
-
-  @Override
-  public SqlIOResolver create(Config config, Config fullConfig) {
-    return new TestRemoteStoreIOResolver(config);
-  }
-
-  public static class InMemoryWriteFunction implements TableWriteFunction<Object, Object> {
-
-    @Override
-    public CompletableFuture<Void> putAsync(Object key, Object record) {
-      records.put(key.toString(), record);
-      return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> deleteAsync(Object key) {
-      records.remove(key.toString());
-      return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  static class InMemoryReadFunction implements TableReadFunction<Object, Object> {
-
-    @Override
-    public CompletableFuture<Object> getAsync(Object key) {
-      return CompletableFuture.completedFuture(records.get(key.toString()));
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  private class TestRemoteStoreIOResolver implements SqlIOResolver {
-    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
-    private final Config config;
-    private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
-    private final String changeLogStorePrefix;
-
-    public TestRemoteStoreIOResolver(Config config) {
-      this.config = config;
-      String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
-      this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
-    }
-
-    private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
-      String[] sourceComponents = ioName.split("\\.");
-      int systemIdx = 0;
-      int endIdx = sourceComponents.length - 1;
-      int streamIdx = endIdx;
-      TableDescriptor tableDescriptor = null;
-
-      if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-        streamIdx = endIdx - 1;
-
-        tableDescriptor = tableDescMap.get(ioName);
-
-        if (tableDescriptor == null) {
-          if (isSink) {
-            tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
-                .withReadFunction(new InMemoryReadFunction())
-                .withWriteFunction(new InMemoryWriteFunction());
-          } else if (sourceComponents[systemIdx].equals(TEST_REMOTE_STORE_SYSTEM)) {
-            tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
-                .withReadFunction(new InMemoryReadFunction());
-          } else {
-            // A local table
-            String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
-            SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
-                (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
-            SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
-                (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-            tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
-          }
-          tableDescMap.put(ioName, tableDescriptor);
-        }
-      }
-
-      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
-      return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
-          Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
-    }
-
-    @Override
-    public SqlIOConfig fetchSourceInfo(String sourceName) {
-      return fetchIOInfo(sourceName, false);
-    }
-
-    @Override
-    public SqlIOConfig fetchSinkInfo(String sinkName) {
-      return fetchIOInfo(sinkName, true);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
deleted file mode 100644
index 7c67082..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.sql.avro.AvroRelConverter;
-import org.apache.samza.sql.avro.AvroRelSchemaProvider;
-import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * SampleRelConverter is an {@link AvroRelConverter} which identifies alternate messages as system messages.
- * This is used purely for testing system messages.
- */
-public class SampleRelConverterFactory implements SamzaRelConverterFactory {
-
-  private int i = 0;
-
-  @Override
-  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config) {
-    return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) relSchemaProvider, config);
-  }
-
-  public class SampleRelConverter extends AvroRelConverter {
-    public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
-      super(systemStream, schemaProvider, config);
-    }
-
-    @Override
-    public boolean isSystemMessage(KV<Object, Object> kv) {
-      // Return alternate ones as system messages.
-      return (i++) % 2 == 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
deleted file mode 100644
index ba40ee5..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.stream.Collectors;
-import org.apache.samza.sql.SamzaSqlRelRecord;
-import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
-
-
-/**
- * A sample {@link SamzaRelTableKeyConverter} used in tests to convert the join key to table format.
- */
-public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
-
-  @Override
-  public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
-    if (relRecord.getFieldValues().get(0) instanceof SamzaSqlRelRecord) {
-      relRecord = (SamzaSqlRelRecord) relRecord.getFieldValues().get(0);
-    }
-    return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
deleted file mode 100644
index 2853ed4..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.HashMap;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
-import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * A sample {@link SamzaRelTableKeyConverterFactory} used in tests to create {@link SampleRelTableKeyConverter}.
- */
-public class SampleRelTableKeyConverterFactory implements SamzaRelTableKeyConverterFactory {
-
-  private final HashMap<SystemStream, SamzaRelTableKeyConverter> relConverters = new HashMap<>();
-
-  @Override
-  public SamzaRelTableKeyConverter create(SystemStream systemStream, Config config) {
-    return relConverters.computeIfAbsent(systemStream, ss -> new SampleRelTableKeyConverter());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
deleted file mode 100644
index 82e57d5..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import com.google.common.base.Joiner;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.sql.avro.AvroRelConverterFactory;
-import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
-import org.apache.samza.sql.avro.schemas.Company;
-import org.apache.samza.sql.avro.schemas.ComplexRecord;
-import org.apache.samza.sql.avro.schemas.EnrichedPageView;
-import org.apache.samza.sql.avro.schemas.PageView;
-import org.apache.samza.sql.avro.schemas.PageViewCount;
-import org.apache.samza.sql.avro.schemas.Profile;
-import org.apache.samza.sql.avro.schemas.SimpleRecord;
-import org.apache.samza.sql.fn.BuildOutputRecordUdf;
-import org.apache.samza.sql.fn.FlattenUdf;
-import org.apache.samza.sql.fn.RegexMatchUdf;
-import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
-import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-
-import static org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory.TEST_REMOTE_STORE_SYSTEM;
-
-
-/**
- * Utility to hookup the configs needed to run the Samza Sql application.
- */
-public class SamzaSqlTestConfig {
-
-  public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
-  public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
-  public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
-
-  public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
-    return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
-  }
-
-  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) {
-    return fetchStaticConfigsWithFactories(props, numberOfMessages, false);
-  }
-
-  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
-      boolean includeNullForeignKeys) {
-    return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, false, 0);
-  }
-
-  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
-      boolean includeNullForeignKeys, boolean includeNullSimpleRecords) {
-    return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, includeNullSimpleRecords, 0);
-  }
-
-  public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
-      boolean includeNullForeignKeys, boolean includeNullSimpleRecords, long windowDurationMs) {
-    HashMap<String, String> staticConfigs = new HashMap<>();
-
-    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
-    staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
-    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
-    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
-
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
-    String configIOResolverDomain =
-        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
-    staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        RemoteStoreIOResolverTestFactory.class.getName());
-
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
-    String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
-    staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedUdfResolver.class.getName());
-    staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
-        .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
-            MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName()));
-
-    String avroSystemConfigPrefix =
-        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
-    String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
-    staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
-    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
-        String.valueOf(numberOfMessages));
-    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
-        includeNullForeignKeys ? "true" : "false");
-    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_SIMPLE_RECORDS,
-        includeNullSimpleRecords ? "true" : "false");
-    staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
-        String.valueOf(windowDurationMs / 2));
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
-    String testRemoteStoreSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", TEST_REMOTE_STORE_SYSTEM);
-    staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
-    staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
-    String avro2SystemConfigPrefix =
-            String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2);
-    String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2);
-    staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
-    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
-            String.valueOf(numberOfMessages));
-    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
-            includeNullForeignKeys ? "true" : "false");
-    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
-            String.valueOf(windowDurationMs / 2));
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
-    staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
-    String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
-    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
-    String avroSamzaToRelMsgConverterDomain =
-        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
-    staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        AvroRelConverterFactory.class.getName());
-
-    String testRemoteStoreSamzaToRelMsgConverterDomain =
-        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TEST_REMOTE_STORE_SYSTEM);
-    staticConfigs.put(testRemoteStoreSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        AvroRelConverterFactory.class.getName());
-
-    String testRemoteStoreSamzaRelTableKeyConverterDomain =
-        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN, "sample");
-    staticConfigs.put(testRemoteStoreSamzaRelTableKeyConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        SampleRelTableKeyConverterFactory.class.getName());
-
-    String configAvroRelSchemaProviderDomain =
-        String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
-    staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedAvroRelSchemaProviderFactory.class.getName());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "SIMPLE3"), SimpleRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "Profile"), ComplexRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "PROFILE"), Profile.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "COMPANY"), Company.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-        "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            TEST_REMOTE_STORE_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
-
-    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
-            TEST_REMOTE_STORE_SYSTEM, "Profile"), Profile.SCHEMA$.toString());
-
-    staticConfigs.putAll(props);
-
-    return staticConfigs;
-  }
-}