You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/01/24 00:51:38 UTC
[1/2] samza git commit: SAMZA-2084: [SAMZA SQL] add logging when
exceptions are swallowed and rename util package
Repository: samza
Updated Branches:
refs/heads/master 4a08a23ed -> 6a1e85eef
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
deleted file mode 100644
index 618ca50..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.testutil;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
-import org.apache.samza.metrics.Timer;
-
-
-/**
- * TestMetricsRegistryImpl implements the MetricRegistry interface and adds get APIs
- * for testing Translators.
- */
-public class TestMetricsRegistryImpl implements org.apache.samza.metrics.MetricsRegistry {
- private Map<String, List<Counter>> counters = new HashMap<>();
- private Map<String, List<Timer>> timers = new HashMap<>();
- private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
- private Map<String, List<ListGauge>> listGauges = new HashMap<>();
-
- @Override
- public Counter newCounter(String group, String name) {
- Counter counter = new Counter(name);
- return newCounter(group, counter);
- }
-
- @Override
- public Counter newCounter(String group, Counter counter) {
- if (!counters.containsKey(group)) {
- counters.put(group, new ArrayList<>());
- }
- counters.get(group).add(counter);
- return counter;
- }
-
- /**
- * retrieves the Map of Counters
- * @return counters
- */
- public Map<String, List<Counter>> getCounters() {
- return counters;
- }
-
- @Override
- public Timer newTimer(String group, String name) {
- Timer timer = new Timer(name);
- return newTimer(group, timer);
- }
-
- @Override
- public Timer newTimer(String group, Timer timer) {
- if (!timers.containsKey(group)) {
- timers.put(group, new ArrayList<>());
- }
- timers.get(group).add(timer);
- return timer;
- }
-
- /**
- * retrieves the Map of Timers
- * @return timers
- */
- public Map<String, List<Timer>> getTimers() {
- return timers;
- }
-
- @Override
- public <T> Gauge<T> newGauge(String group, String name, T value) {
- Gauge<T> gauge = new Gauge<>(name, value);
- return newGauge(group, gauge);
- }
-
- @Override
- public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
- if (!gauges.containsKey(group)) {
- gauges.put(group, new ArrayList<>());
- }
- gauges.get(group).add(gauge);
- return gauge;
- }
-
- /**
- * retrieves the Map of Gauges
- * @return gauges
- */
- public Map<String, List<Gauge<?>>> getGauges() {
- return gauges;
- }
-
- @Override
- public ListGauge newListGauge(String group, ListGauge listGauge) {
- listGauges.putIfAbsent(group, new ArrayList());
- listGauges.get(group).add(listGauge);
- return listGauge;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
deleted file mode 100644
index dd98b92..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSamzaSqlFileParser {
-
- public static final String TEST_SQL =
- "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n"
- + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, "
- + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n"
- + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n"
- + "\tfrom tracking.SamzaSqlTestTopic1_p8";
-
- @Test
- public void testParseSqlFile() throws IOException {
- File tempFile = File.createTempFile("testparser", "");
- PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
- fileWriter.println(TEST_SQL);
- fileWriter.close();
-
- List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
- Assert.assertEquals(3, sqlStmts.size());
- Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts",
- sqlStmts.get(0));
- Assert.assertEquals(
- "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts",
- sqlStmts.get(1));
- Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8",
- sqlStmts.get(2));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
deleted file mode 100644
index be1e317..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-public class TestSamzaSqlQueryParser {
-
- @Test
- public void testParseQuery() {
- QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
- Assert.assertEquals("log.foo", queryInfo.getSink());
- Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
- Assert.assertEquals(1, queryInfo.getSources().size());
- Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
- }
-
- @Test
- public void testParseJoinQuery() {
- String sql =
- "Insert into testavro.enrichedPageViewTopic"
- + " select p.name as profileName, pv.pageKey"
- + " from testavro.PAGEVIEW as pv"
- + " join testavro.PROFILE.`$table` as p"
- + " on p.id = pv.profileId";
- QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
- Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
- Assert.assertEquals(2, queryInfo.getSources().size());
- Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
- Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
- }
-
- @Test
- public void testParseInvalidQuery() {
-
- try {
- SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
- Assert.fail("Expected a samzaException");
- } catch (SamzaException e) {
- }
-
- try {
- SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
- Assert.fail("Expected a samzaException");
- } catch (SamzaException e) {
- }
-
- try {
- SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
- Assert.fail("Expected a samzaException");
- } catch (SamzaException e) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index e804113..037201e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -20,7 +20,6 @@
package org.apache.samza.sql.translator;
import java.io.IOException;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -41,7 +40,7 @@ import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 233fca4..e8f2d1f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -38,13 +38,11 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 29ce6d0..050971f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -19,7 +19,6 @@
package org.apache.samza.sql.translator;
import java.io.IOException;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -50,8 +49,7 @@ import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
-import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index c81a1fc..efe3896 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -37,10 +37,10 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.TestMetricsRegistryImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
index 4e97ced..2deb6ad 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRemoteTableJoinFunction.java
@@ -19,7 +19,6 @@
package org.apache.samza.sql.translator;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -37,7 +36,7 @@ import org.apache.samza.sql.avro.schemas.SimpleRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
-import org.apache.samza.sql.testutil.SampleRelTableKeyConverter;
+import org.apache.samza.sql.util.SampleRelTableKeyConverter;
import org.apache.samza.system.SystemStream;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java
new file mode 100644
index 0000000..c71813b
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestArrayUdf.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.util;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.SamzaSqlUdf;
+import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+@SamzaSqlUdf(name = "MyTestArray")
+public class MyTestArrayUdf implements ScalarUdf {
+ @Override
+ public void init(Config udfConfig) {
+ }
+
+ @SamzaSqlUdfMethod
+ public List<String> execute(Object... args) {
+ Integer value = (Integer) args[0];
+ return IntStream.range(0, value).mapToObj(String::valueOf).collect(Collectors.toList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
new file mode 100644
index 0000000..6b714b4
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.util;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.SamzaSqlUdf;
+import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
+import org.apache.samza.sql.udfs.ScalarUdf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test UDF used by unit and integration tests.
+ */
+@SamzaSqlUdf(name = "MyTest")
+public class MyTestUdf implements ScalarUdf {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
+
+ @SamzaSqlUdfMethod
+ public Integer execute(Object... value) {
+ return ((Integer) value[0]) * 2;
+ }
+
+ @Override
+ public void init(Config udfConfig) {
+ LOG.info("Init called with {}", udfConfig);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java
new file mode 100644
index 0000000..f5ee75e
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/RemoteStoreIOResolverTestFactory.java
@@ -0,0 +1,147 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
+
+public class RemoteStoreIOResolverTestFactory implements SqlIOResolverFactory {
+ public static final String TEST_REMOTE_STORE_SYSTEM = "testRemoteStore";
+ public static final String TEST_TABLE_ID = "testTableId";
+
+ public static transient Map<Object, Object> records = new HashMap<>();
+
+ @Override
+ public SqlIOResolver create(Config config, Config fullConfig) {
+ return new TestRemoteStoreIOResolver(config);
+ }
+
+ public static class InMemoryWriteFunction implements TableWriteFunction<Object, Object> {
+
+ @Override
+ public CompletableFuture<Void> putAsync(Object key, Object record) {
+ records.put(key.toString(), record);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(Object key) {
+ records.remove(key.toString());
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ static class InMemoryReadFunction implements TableReadFunction<Object, Object> {
+
+ @Override
+ public CompletableFuture<Object> getAsync(Object key) {
+ return CompletableFuture.completedFuture(records.get(key.toString()));
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ private class TestRemoteStoreIOResolver implements SqlIOResolver {
+ private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+ private final Config config;
+ private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+ private final String changeLogStorePrefix;
+
+ public TestRemoteStoreIOResolver(Config config) {
+ this.config = config;
+ String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+ this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
+ }
+
+ private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
+ String[] sourceComponents = ioName.split("\\.");
+ int systemIdx = 0;
+ int endIdx = sourceComponents.length - 1;
+ int streamIdx = endIdx;
+ TableDescriptor tableDescriptor = null;
+
+ if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+ streamIdx = endIdx - 1;
+
+ tableDescriptor = tableDescMap.get(ioName);
+
+ if (tableDescriptor == null) {
+ if (isSink) {
+ tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+ .withReadFunction(new InMemoryReadFunction())
+ .withWriteFunction(new InMemoryWriteFunction());
+ } else if (sourceComponents[systemIdx].equals(TEST_REMOTE_STORE_SYSTEM)) {
+ tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
+ .withReadFunction(new InMemoryReadFunction());
+ } else {
+ // A local table
+ String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
+ SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
+ (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
+ SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+ (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+ tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
+ }
+ tableDescMap.put(ioName, tableDescriptor);
+ }
+ }
+
+ Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+ return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
+ Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
+ }
+
+ @Override
+ public SqlIOConfig fetchSourceInfo(String sourceName) {
+ return fetchIOInfo(sourceName, false);
+ }
+
+ @Override
+ public SqlIOConfig fetchSinkInfo(String sinkName) {
+ return fetchIOInfo(sinkName, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java
new file mode 100644
index 0000000..5c6b31f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelConverterFactory.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * SampleRelConverter is an {@link AvroRelConverter} which identifies alternate messages as system messages.
+ * This is used purely for testing system messages.
+ */
+public class SampleRelConverterFactory implements SamzaRelConverterFactory {
+
+ private int i = 0;
+
+ @Override
+ public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config) {
+ return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) relSchemaProvider, config);
+ }
+
+ public class SampleRelConverter extends AvroRelConverter {
+ public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
+ super(systemStream, schemaProvider, config);
+ }
+
+ @Override
+ public boolean isSystemMessage(KV<Object, Object> kv) {
+ // Return alternate ones as system messages.
+ return (i++) % 2 == 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java
new file mode 100644
index 0000000..1589995
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverter.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.stream.Collectors;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverter} used in tests to convert the join key to table format.
+ */
+public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
+
+ @Override
+ public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
+ if (relRecord.getFieldValues().get(0) instanceof SamzaSqlRelRecord) {
+ relRecord = (SamzaSqlRelRecord) relRecord.getFieldValues().get(0);
+ }
+ return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java
new file mode 100644
index 0000000..0234527
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SampleRelTableKeyConverterFactory.java
@@ -0,0 +1,40 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.HashMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
+import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * A sample {@link SamzaRelTableKeyConverterFactory} used in tests to create {@link SampleRelTableKeyConverter}.
+ */
+public class SampleRelTableKeyConverterFactory implements SamzaRelTableKeyConverterFactory {
+
+ private final HashMap<SystemStream, SamzaRelTableKeyConverter> relConverters = new HashMap<>();
+
+ @Override
+ public SamzaRelTableKeyConverter create(SystemStream systemStream, Config config) {
+ return relConverters.computeIfAbsent(systemStream, ss -> new SampleRelTableKeyConverter());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
new file mode 100644
index 0000000..19a8638
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -0,0 +1,209 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import com.google.common.base.Joiner;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.sql.avro.AvroRelConverterFactory;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.Company;
+import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.EnrichedPageView;
+import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.PageViewCount;
+import org.apache.samza.sql.avro.schemas.Profile;
+import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.fn.BuildOutputRecordUdf;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.fn.RegexMatchUdf;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.system.TestAvroSystemFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+
+import static org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory.TEST_REMOTE_STORE_SYSTEM;
+
+
+/**
+ * Utility to hookup the configs needed to run the Samza Sql application.
+ */
+public class SamzaSqlTestConfig {
+
+ public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+ public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
+ public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
+
+ public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
+ return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
+ }
+
+ public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) {
+ return fetchStaticConfigsWithFactories(props, numberOfMessages, false);
+ }
+
+ public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+ boolean includeNullForeignKeys) {
+ return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, false, 0);
+ }
+
+ public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+ boolean includeNullForeignKeys, boolean includeNullSimpleRecords) {
+ return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, includeNullSimpleRecords, 0);
+ }
+
+ public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+ boolean includeNullForeignKeys, boolean includeNullSimpleRecords, long windowDurationMs) {
+ HashMap<String, String> staticConfigs = new HashMap<>();
+
+ staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
+ staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
+ staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+ staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+ String configIOResolverDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+ staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ RemoteStoreIOResolverTestFactory.class.getName());
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+ String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+ staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ ConfigBasedUdfResolver.class.getName());
+ staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
+ .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
+ MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName()));
+
+ String avroSystemConfigPrefix =
+ String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
+ String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
+ staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+ staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+ String.valueOf(numberOfMessages));
+ staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+ includeNullForeignKeys ? "true" : "false");
+ staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_SIMPLE_RECORDS,
+ includeNullSimpleRecords ? "true" : "false");
+ staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+ String.valueOf(windowDurationMs / 2));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
+ staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String testRemoteStoreSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", TEST_REMOTE_STORE_SYSTEM);
+ staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
+ staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String avro2SystemConfigPrefix =
+ String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2);
+ String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2);
+ staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+ staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+ String.valueOf(numberOfMessages));
+ staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+ includeNullForeignKeys ? "true" : "false");
+ staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+ String.valueOf(windowDurationMs / 2));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
+ staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
+ staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String avroSamzaToRelMsgConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+ staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ AvroRelConverterFactory.class.getName());
+
+ String testRemoteStoreSamzaToRelMsgConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TEST_REMOTE_STORE_SYSTEM);
+ staticConfigs.put(testRemoteStoreSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ AvroRelConverterFactory.class.getName());
+
+ String testRemoteStoreSamzaRelTableKeyConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN, "sample");
+ staticConfigs.put(testRemoteStoreSamzaRelTableKeyConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ SampleRelTableKeyConverterFactory.class.getName());
+
+ String configAvroRelSchemaProviderDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
+ staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ ConfigBasedAvroRelSchemaProviderFactory.class.getName());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "SIMPLE3"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "Profile"), ComplexRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "PROFILE"), Profile.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "COMPANY"), Company.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ TEST_REMOTE_STORE_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ TEST_REMOTE_STORE_SYSTEM, "Profile"), Profile.SCHEMA$.toString());
+
+ staticConfigs.putAll(props);
+
+ return staticConfigs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
new file mode 100644
index 0000000..4dd4dd8
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.metrics.Timer;
+
+
+/**
+ * TestMetricsRegistryImpl implements the MetricRegistry interface and adds get APIs
+ * for testing Translators.
+ */
+public class TestMetricsRegistryImpl implements org.apache.samza.metrics.MetricsRegistry {
+ private Map<String, List<Counter>> counters = new HashMap<>();
+ private Map<String, List<Timer>> timers = new HashMap<>();
+ private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
+ private Map<String, List<ListGauge>> listGauges = new HashMap<>();
+
+ @Override
+ public Counter newCounter(String group, String name) {
+ Counter counter = new Counter(name);
+ return newCounter(group, counter);
+ }
+
+ @Override
+ public Counter newCounter(String group, Counter counter) {
+ if (!counters.containsKey(group)) {
+ counters.put(group, new ArrayList<>());
+ }
+ counters.get(group).add(counter);
+ return counter;
+ }
+
+ /**
+ * retrieves the Map of Counters
+ * @return counters
+ */
+ public Map<String, List<Counter>> getCounters() {
+ return counters;
+ }
+
+ @Override
+ public Timer newTimer(String group, String name) {
+ Timer timer = new Timer(name);
+ return newTimer(group, timer);
+ }
+
+ @Override
+ public Timer newTimer(String group, Timer timer) {
+ if (!timers.containsKey(group)) {
+ timers.put(group, new ArrayList<>());
+ }
+ timers.get(group).add(timer);
+ return timer;
+ }
+
+ /**
+ * retrieves the Map of Timers
+ * @return timers
+ */
+ public Map<String, List<Timer>> getTimers() {
+ return timers;
+ }
+
+ @Override
+ public <T> Gauge<T> newGauge(String group, String name, T value) {
+ Gauge<T> gauge = new Gauge<>(name, value);
+ return newGauge(group, gauge);
+ }
+
+ @Override
+ public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
+ if (!gauges.containsKey(group)) {
+ gauges.put(group, new ArrayList<>());
+ }
+ gauges.get(group).add(gauge);
+ return gauge;
+ }
+
+ /**
+ * retrieves the Map of Gauges
+ * @return gauges
+ */
+ public Map<String, List<Gauge<?>>> getGauges() {
+ return gauges;
+ }
+
+ @Override
+ public ListGauge newListGauge(String group, ListGauge listGauge) {
+ listGauges.putIfAbsent(group, new ArrayList());
+ listGauges.get(group).add(listGauge);
+ return listGauge;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java
new file mode 100644
index 0000000..d5372ea
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlFileParser.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlFileParser {
+
+ public static final String TEST_SQL =
+ "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n"
+ + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, "
+ + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n"
+ + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n"
+ + "\tfrom tracking.SamzaSqlTestTopic1_p8";
+
+ @Test
+ public void testParseSqlFile() throws IOException {
+ File tempFile = File.createTempFile("testparser", "");
+ PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath());
+ fileWriter.println(TEST_SQL);
+ fileWriter.close();
+
+ List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath());
+ Assert.assertEquals(3, sqlStmts.size());
+ Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts",
+ sqlStmts.get(0));
+ Assert.assertEquals(
+ "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts",
+ sqlStmts.get(1));
+ Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8",
+ sqlStmts.get(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
new file mode 100644
index 0000000..f15c2f6
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestSamzaSqlQueryParser {
+
+ @Test
+ public void testParseQuery() {
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
+ Assert.assertEquals("log.foo", queryInfo.getSink());
+ Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
+ Assert.assertEquals(1, queryInfo.getSources().size());
+ Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+ }
+
+ @Test
+ public void testParseJoinQuery() {
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from testavro.PAGEVIEW as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+ Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+ Assert.assertEquals(2, queryInfo.getSources().size());
+ Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+ Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
+ }
+
+ @Test
+ public void testParseInvalidQuery() {
+
+ try {
+ SamzaSqlQueryParser.parseQuery("select * from tracking.bar");
+ Assert.fail("Expected a samzaException");
+ } catch (SamzaException e) {
+ }
+
+ try {
+ SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar");
+ Assert.fail("Expected a samzaException");
+ } catch (SamzaException e) {
+ }
+
+ try {
+ SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar");
+ Assert.fail("Expected a samzaException");
+ } catch (SamzaException e) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 259bfbf..76119e4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -36,10 +36,10 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.serializers.JsonSerdeV2Factory;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.MyTestUdf;
-import org.apache.samza.sql.testutil.SampleRelConverterFactory;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.MyTestUdf;
+import org.apache.samza.sql.util.SampleRelConverterFactory;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index c075677..2c9556f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -28,9 +28,9 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
-import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index bc44470..ab60156 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -43,8 +43,8 @@ import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SqlFileParser;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SqlFileParser;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.kafka.KafkaSystemFactory;
import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
index 9392aab..5cbe0cc 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
@@ -33,7 +33,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.util.ReflectionUtils;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.tools.CommandLineHelper;
[2/2] samza git commit: SAMZA-2084: [SAMZA SQL] add logging when
exceptions are swallowed and rename util package
Posted by sr...@apache.org.
SAMZA-2084: [SAMZA SQL] add logging when exceptions are swallowed and rename util package
## What changes were proposed in this pull request?
This PR is to resolve:
1) Samza Sql Shell: add logging when exceptions are swallowed
2) Samza Sql: rename the package name of "[testutil](https://github.com/apache/samza/tree/master/samza-sql/src/main/java/org/apache/samza/sql/testutil)" to "util".
## How was this patch tested?
Pass the current unit tests.
Author: Weiqing Yang <ya...@gmail.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>
Closes #891 from weiqingy/SAMZA-2084
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a1e85ee
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a1e85ee
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a1e85ee
Branch: refs/heads/master
Commit: 6a1e85eef298e3587b277bf5745d0b1d9962e4ec
Parents: 4a08a23
Author: Weiqing Yang <ya...@gmail.com>
Authored: Wed Jan 23 16:51:33 2019 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Wed Jan 23 16:51:33 2019 -0800
----------------------------------------------------------------------
.../samza/sql/client/impl/SamzaExecutor.java | 42 ++--
.../sql/data/SamzaSqlExecutionContext.java | 2 +-
.../samza/sql/dsl/SamzaSqlDslConverter.java | 4 +-
.../sql/runner/SamzaSqlApplicationConfig.java | 6 +-
.../apache/samza/sql/testutil/ConfigUtil.java | 62 ------
.../org/apache/samza/sql/testutil/JsonUtil.java | 91 --------
.../samza/sql/testutil/ReflectionUtils.java | 62 ------
.../samza/sql/testutil/SamzaSqlQueryParser.java | 196 -----------------
.../samza/sql/testutil/SqlFileParser.java | 103 ---------
.../samza/sql/translator/QueryTranslator.java | 2 +-
.../org/apache/samza/sql/util/ConfigUtil.java | 62 ++++++
.../org/apache/samza/sql/util/JsonUtil.java | 91 ++++++++
.../apache/samza/sql/util/ReflectionUtils.java | 62 ++++++
.../samza/sql/util/SamzaSqlQueryParser.java | 196 +++++++++++++++++
.../apache/samza/sql/util/SqlFileParser.java | 103 +++++++++
.../runner/TestSamzaSqlApplicationConfig.java | 6 +-
.../runner/TestSamzaSqlApplicationRunner.java | 2 +-
.../samza/sql/testutil/MyTestArrayUdf.java | 42 ----
.../apache/samza/sql/testutil/MyTestUdf.java | 50 -----
.../RemoteStoreIOResolverTestFactory.java | 147 -------------
.../sql/testutil/SampleRelConverterFactory.java | 56 -----
.../testutil/SampleRelTableKeyConverter.java | 39 ----
.../SampleRelTableKeyConverterFactory.java | 41 ----
.../samza/sql/testutil/SamzaSqlTestConfig.java | 209 -------------------
.../sql/testutil/TestMetricsRegistryImpl.java | 117 -----------
.../sql/testutil/TestSamzaSqlFileParser.java | 56 -----
.../sql/testutil/TestSamzaSqlQueryParser.java | 75 -------
.../sql/translator/TestFilterTranslator.java | 3 +-
.../sql/translator/TestJoinTranslator.java | 4 +-
.../sql/translator/TestProjectTranslator.java | 4 +-
.../sql/translator/TestQueryTranslator.java | 8 +-
.../TestSamzaSqlRemoteTableJoinFunction.java | 3 +-
.../apache/samza/sql/util/MyTestArrayUdf.java | 42 ++++
.../org/apache/samza/sql/util/MyTestUdf.java | 49 +++++
.../util/RemoteStoreIOResolverTestFactory.java | 147 +++++++++++++
.../sql/util/SampleRelConverterFactory.java | 56 +++++
.../sql/util/SampleRelTableKeyConverter.java | 39 ++++
.../util/SampleRelTableKeyConverterFactory.java | 40 ++++
.../samza/sql/util/SamzaSqlTestConfig.java | 209 +++++++++++++++++++
.../samza/sql/util/TestMetricsRegistryImpl.java | 117 +++++++++++
.../samza/sql/util/TestSamzaSqlFileParser.java | 56 +++++
.../samza/sql/util/TestSamzaSqlQueryParser.java | 75 +++++++
.../test/samzasql/TestSamzaSqlEndToEnd.java | 8 +-
.../test/samzasql/TestSamzaSqlRemoteTable.java | 6 +-
.../org/apache/samza/tools/SamzaSqlConsole.java | 4 +-
.../tools/benchmark/AbstractSamzaBench.java | 2 +-
46 files changed, 1398 insertions(+), 1398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 1149364..7df79c9 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -31,7 +31,6 @@ import org.apache.samza.config.*;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.serializers.StringSerdeFactory;
-import org.apache.samza.sql.avro.AvroRelSchemaProvider;
import org.apache.samza.sql.client.interfaces.*;
import org.apache.samza.sql.client.util.RandomAccessQueue;
import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
@@ -49,7 +48,7 @@ import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
-import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.util.JsonUtil;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.kafka.KafkaSystemFactory;
@@ -126,8 +125,9 @@ public class SamzaExecutor implements SqlExecutor {
.map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
.collect(Collectors.toList());
} catch (ZkTimeoutException ex) {
- lastErrorMsg = ex.toString();
- LOG.error(lastErrorMsg);
+ String msg = "listTables failed with exception ";
+ lastErrorMsg = msg + ex.toString();
+ LOG.error(msg, ex);
}
return tables;
}
@@ -151,8 +151,9 @@ public class SamzaExecutor implements SqlExecutor {
(o, c) -> ((RelSchemaProviderFactory) o).create(sourceInfo.getSystemStream(), c));
sqlSchema = schemaProvider.getSqlSchema();
} catch (SamzaException ex) {
- lastErrorMsg = ex.toString();
- LOG.error(lastErrorMsg);
+ String msg = "getTableSchema failed with exception ";
+ lastErrorMsg = msg + ex.toString();
+ LOG.error(msg, ex);
}
return sqlSchema;
}
@@ -172,8 +173,9 @@ public class SamzaExecutor implements SqlExecutor {
runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
runner.run(null);
} catch (SamzaException ex) {
- lastErrorMsg = ex.toString();
- LOG.error(lastErrorMsg);
+ String msg = "Execution failed with exception ";
+ lastErrorMsg = msg + ex.toString();
+ LOG.error(msg, ex);
return new QueryResult(execId, null, false);
}
executions.put(execId, runner);
@@ -214,8 +216,9 @@ public class SamzaExecutor implements SqlExecutor {
try {
executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList());
} catch (IOException e) {
- lastErrorMsg = String.format("Unable to parse the sql file %s. %s", sqlFile.getPath(), e.toString());
- LOG.error(lastErrorMsg);
+ String msg = "Unable to parse the sql file " + sqlFile.getAbsolutePath();
+ lastErrorMsg = msg + e.toString();
+ LOG.error(msg, e);
return new NonQueryResult(-1, false);
}
LOG.info("Sql statements in Sql file: " + executedStmts.toString());
@@ -245,8 +248,9 @@ public class SamzaExecutor implements SqlExecutor {
runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
runner.run(null);
} catch (SamzaException ex) {
- lastErrorMsg = ex.toString();
- LOG.error(lastErrorMsg);
+ String msg = "Execution of the query failed with exception ";
+ lastErrorMsg = msg + ex.toString();
+ LOG.error(msg, ex);
return new NonQueryResult(execId, false);
}
executions.put(execId, runner);
@@ -265,9 +269,10 @@ public class SamzaExecutor implements SqlExecutor {
try {
runner.kill();
- } catch (SamzaException ex) {
- lastErrorMsg = ex.toString();
- LOG.debug(lastErrorMsg);
+ } catch (SamzaException ex) {
+ String msg = "Stopping execution failed with exception ";
+ lastErrorMsg = msg + ex.toString();
+ LOG.warn(msg, ex);
return false;
}
@@ -489,15 +494,18 @@ public class SamzaExecutor implements SqlExecutor {
}
private String getPrettyFormat(OutgoingMessageEnvelope envelope) {
+ lastErrorMsg = "";
String value = new String((byte[]) envelope.getMessage());
ObjectMapper mapper = new ObjectMapper();
String formattedValue;
try {
Object json = mapper.readValue(value, Object.class);
formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
- } catch (IOException e) {
+ } catch (IOException ex) {
formattedValue = value;
- LOG.error("Error while formatting json", e);
+ String msg = "getPrettyFormat failed with exception while formatting json ";
+ lastErrorMsg = msg + ex.toString();
+ LOG.error(msg, ex);
}
return formattedValue;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
index cb5c7a7..091ca62 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -28,7 +28,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.util.ReflectionUtils;
import org.apache.samza.sql.udfs.ScalarUdf;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index d4cb134..ea0ebfa 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -32,8 +32,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.sql.interfaces.DslConverter;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SqlFileParser;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SqlFileParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index b2a5efe..4883dfb 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -55,9 +55,9 @@ import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.interfaces.UdfResolver;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.ReflectionUtils;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.ReflectionUtils;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
deleted file mode 100644
index 8bbab86..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ConfigUtil.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-
-
-/**
- * Utility methods to aid with config management.
- */
-public class ConfigUtil {
-
- private ConfigUtil() {
- }
-
- /**
- * Method is used to filter just the properties with the prefix.
- * @param props Full set of properties
- * @param prefix Prefix of the keys that in the config that needs to be filtered out.
- * @param preserveFullKey If set to true, after filtering, preserves the full key including the prefix.
- * If set to false, Strips out the prefix from the key before returning.
- * @return Returns the filtered set of properties matching the prefix from the input property bag.
- */
- public static Properties getDomainProperties(Properties props, String prefix, boolean preserveFullKey) {
- String fullPrefix;
- if (StringUtils.isBlank(prefix)) {
- fullPrefix = ""; // this will effectively retrieve all properties
- } else {
- fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
- }
- Properties ret = new Properties();
- props.keySet().stream().map(String.class::cast).forEach(keyStr -> {
- if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
- if (preserveFullKey) {
- ret.put(keyStr, props.get(keyStr));
- } else {
- ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
- }
- }
- });
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
deleted file mode 100644
index ea78f41..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/JsonUtil.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.io.IOException;
-import java.io.StringWriter;
-
-import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility methods to aid serialization and deserialization of Json.
- */
-public class JsonUtil {
-
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- private static final Logger LOG = LoggerFactory.getLogger(JsonUtil.class.getName());
-
- static {
- MAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private JsonUtil() {
- }
-
- /**
- * Deserialize a JSON string into an object based on a type reference.
- * This method allows the caller to specify precisely the desired output
- * type for the target object.
- * @param json JSON string
- * @param typeRef type reference of the target object
- * @param <T> type of the target object
- * @return deserialized Java object
- */
- public static <T> T fromJson(String json, TypeReference<T> typeRef) {
- Validate.notNull(json, "null JSON string");
- Validate.notNull(typeRef, "null type reference");
- T object;
- try {
- object = MAPPER.readValue(json, typeRef);
- } catch (IOException e) {
- String errorMessage = "Failed to parse json: " + json;
- LOG.error(errorMessage, e);
- throw new SamzaException(errorMessage, e);
- }
- return object;
- }
-
- /**
- * Serialize a Java object into JSON string.
- * @param object object to be serialized
- * @param <T> type of the input object
- * @return JSON string
- */
- public static <T> String toJson(T object) {
- Validate.notNull(object, "null input object");
- StringWriter out = new StringWriter();
- try {
- MAPPER.writeValue(out, object);
- } catch (IOException e) {
- String errorMessage = "Failed to serialize object: " + object;
- LOG.error(errorMessage, e);
- throw new SamzaException(errorMessage, e);
- }
- return out.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
deleted file mode 100644
index 4293c76..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/ReflectionUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.lang.reflect.Constructor;
-import java.util.stream.IntStream;
-
-import org.apache.commons.lang.Validate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class to simplify usage of Java reflection.
- */
-public class ReflectionUtils {
- private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
-
- private ReflectionUtils() {
-
- }
-
- /**
- * Create an instance of the specified class with constuctor
- * matching the argument array.
- * @param clazz name of the class
- * @param args argument array
- * @param <T> type fo the class
- * @return instance of the class, or null if anything went wrong
- */
- @SuppressWarnings("unchecked")
- public static <T> T createInstance(String clazz, Object... args) {
- Validate.notNull(clazz, "null class name");
- try {
- Class<T> classObj = (Class<T>) Class.forName(clazz);
- Class<?>[] argTypes = new Class<?>[args.length];
- IntStream.range(0, args.length).forEach(i -> argTypes[i] = args[i].getClass());
- Constructor<T> ctor = classObj.getDeclaredConstructor(argTypes);
- return ctor.newInstance(args);
- } catch (Exception e) {
- LOG.warn("Failed to create instance for: " + clazz, e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
deleted file mode 100644
index 643c82f..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlAsOperator;
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlJoin;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.SqlUnnestOperator;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.samza.SamzaException;
-import org.apache.samza.sql.interfaces.SamzaSqlDriver;
-import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
-
-
-/**
- * Utility class that is used to parse the Samza sql query to figure out the sources, sink etc..
- */
-public class SamzaSqlQueryParser {
-
- private SamzaSqlQueryParser() {
- }
-
- public static class QueryInfo {
- private final List<String> sources;
- private String selectQuery;
- private String sink;
- private String sql;
-
- public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) {
- this.selectQuery = selectQuery;
- this.sink = sink;
- this.sources = sources;
- this.sql = sql;
- }
-
- public List<String> getSources() {
- return sources;
- }
-
- public String getSelectQuery() {
- return selectQuery;
- }
-
- public String getSink() {
- return sink;
- }
-
- public String getSql() {
- return sql;
- }
- }
-
- public static QueryInfo parseQuery(String sql) {
-
- Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select .* from (.*))", Pattern.CASE_INSENSITIVE);
- Matcher m = insertIntoSqlPattern.matcher(sql);
- if (!m.matches()) {
- throw new SamzaException("Invalid query format");
- }
-
- Planner planner = createPlanner();
- SqlNode sqlNode;
- try {
- sqlNode = planner.parse(sql);
- } catch (SqlParseException e) {
- throw new SamzaException(e);
- }
-
- String sink;
- String selectQuery;
- ArrayList<String> sources;
- if (sqlNode instanceof SqlInsert) {
- SqlInsert sqlInsert = ((SqlInsert) sqlNode);
- sink = sqlInsert.getTargetTable().toString();
- if (sqlInsert.getSource() instanceof SqlSelect) {
- SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
- selectQuery = m.group(2);
- sources = getSourcesFromSelectQuery(sqlSelect);
- } else {
- throw new SamzaException("Sql query is not of the expected format");
- }
- } else {
- throw new SamzaException("Sql query is not of the expected format");
- }
-
- return new QueryInfo(selectQuery, sources, sink, sql);
- }
-
- private static Planner createPlanner() {
- Connection connection;
- SchemaPlus rootSchema;
- try {
- JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl();
- SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory);
- DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:"));
- DriverManager.registerDriver(driver);
- connection = driver.connect("jdbc:calcite:", new Properties());
- CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
- rootSchema = calciteConnection.getRootSchema();
- } catch (SQLException e) {
- throw new SamzaException(e);
- }
-
- final List<RelTraitDef> traitDefs = new ArrayList<>();
-
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
- .defaultSchema(rootSchema)
- .operatorTable(SqlStdOperatorTable.instance())
- .traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
- .costFactory(null)
- .build();
- return Frameworks.getPlanner(frameworkConfig);
- }
-
- private static ArrayList<String> getSourcesFromSelectQuery(SqlSelect sqlSelect) {
- ArrayList<String> sources = new ArrayList<>();
- getSource(sqlSelect.getFrom(), sources);
- if (sources.size() < 1) {
- throw new SamzaException("Unsupported query " + sqlSelect);
- }
-
- return sources;
- }
-
- private static void getSource(SqlNode node, ArrayList<String> sourceList) {
- if (node instanceof SqlJoin) {
- SqlJoin joinNode = (SqlJoin) node;
- ArrayList<String> sourcesLeft = new ArrayList<>();
- ArrayList<String> sourcesRight = new ArrayList<>();
- getSource(joinNode.getLeft(), sourcesLeft);
- getSource(joinNode.getRight(), sourcesRight);
-
- sourceList.addAll(sourcesLeft);
- sourceList.addAll(sourcesRight);
- } else if (node instanceof SqlIdentifier) {
- sourceList.add(node.toString());
- } else if (node instanceof SqlBasicCall) {
- SqlBasicCall basicCall = ((SqlBasicCall) node);
- if (basicCall.getOperator() instanceof SqlAsOperator) {
- getSource(basicCall.operand(0), sourceList);
- } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
- sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
- }
- } else if (node instanceof SqlSelect) {
- getSource(((SqlSelect) node).getFrom(), sourceList);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
deleted file mode 100644
index ca355ad..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SqlFileParser.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility to read the .sql file and parse out the various sql statements in the file.
- * Right now Samza SQL
- * Samza SQL supports a sql file with multiple SQL statements where each SQL statement can be spread across
- * multiple lines.
- * It supports sql comments where a line starts with "--".
- * It cannot support multiple sql statements in a single line.
- * All the empty lines are ignored
- * All the SQL statements should start with "insert into".
- *
- * e.g. SQL File
- * -- Sample comment
- * insert into log.output1 select * from kafka.input1
- *
- * insert into log.output2
- * select * from kafka.input2
- *
- * -- You may have empty lines in between a single query.
- * insert into log.output3
- *
- * select * from kafka.input3
- *
- * -- Below line which contains multiple sql statements are not supported
- * -- insert into log.output4 select * from kafka.input4 insert into log.output5 select * from kafka.input5
- *
- * -- Below SQL statement is not supported because it doesn't start with insert into
- * -- select * from kafka.input6
- */
-public class SqlFileParser {
-
- private static final String INSERT_CMD = "insert";
- private static final Logger LOG = LoggerFactory.getLogger(SqlFileParser.class);
- private static final String SQL_COMMENT_PREFIX = "--";
-
- private SqlFileParser() {
- }
-
- public static List<String> parseSqlFile(String fileName) {
- Validate.notEmpty(fileName, "fileName cannot be empty.");
- List<String> sqlLines;
- try {
- sqlLines = Files.lines(Paths.get(fileName)).collect(Collectors.toList());
- } catch (IOException e) {
- String msg = String.format("Unable to parse the sql file %s", fileName);
- LOG.error(msg, e);
- throw new SamzaException(msg, e);
- }
- List<String> sqlStmts = new ArrayList<>();
- String lastStatement = "";
- for (String sqlLine : sqlLines) {
- String sql = sqlLine.trim();
- if (sql.toLowerCase().startsWith(INSERT_CMD)) {
- if (StringUtils.isNotEmpty(lastStatement)) {
- sqlStmts.add(lastStatement);
- }
-
- lastStatement = sql;
- } else if (StringUtils.isNotBlank(sql) && !sql.startsWith(SQL_COMMENT_PREFIX)) {
- lastStatement = String.format("%s %s", lastStatement, sql);
- }
- }
-
- if (!StringUtils.isWhitespace(lastStatement)) {
- sqlStmts.add(lastStatement);
- }
- return sqlStmts;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index c8d1edf..0fdfd39 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -61,7 +61,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.table.Table;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java
new file mode 100644
index 0000000..e87ea2f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/ConfigUtil.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+
+/**
+ * Utility methods to aid with config management.
+ */
+public class ConfigUtil {
+
+ private ConfigUtil() {
+ }
+
+ /**
+ * Method is used to filter just the properties with the prefix.
+ * @param props Full set of properties
+ * @param prefix Prefix of the keys that in the config that needs to be filtered out.
+ * @param preserveFullKey If set to true, after filtering, preserves the full key including the prefix.
+ * If set to false, Strips out the prefix from the key before returning.
+ * @return Returns the filtered set of properties matching the prefix from the input property bag.
+ */
+ public static Properties getDomainProperties(Properties props, String prefix, boolean preserveFullKey) {
+ String fullPrefix;
+ if (StringUtils.isBlank(prefix)) {
+ fullPrefix = ""; // this will effectively retrieve all properties
+ } else {
+ fullPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
+ }
+ Properties ret = new Properties();
+ props.keySet().stream().map(String.class::cast).forEach(keyStr -> {
+ if (keyStr.startsWith(fullPrefix) && !keyStr.equals(fullPrefix)) {
+ if (preserveFullKey) {
+ ret.put(keyStr, props.get(keyStr));
+ } else {
+ ret.put(keyStr.substring(fullPrefix.length()), props.get(keyStr));
+ }
+ }
+ });
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
new file mode 100644
index 0000000..afd6490
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
@@ -0,0 +1,91 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility methods to aid serialization and deserialization of Json.
+ */
+public class JsonUtil {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static final Logger LOG = LoggerFactory.getLogger(JsonUtil.class.getName());
+
+ static {
+ MAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ private JsonUtil() {
+ }
+
+ /**
+ * Deserialize a JSON string into an object based on a type reference.
+ * This method allows the caller to specify precisely the desired output
+ * type for the target object.
+ * @param json JSON string
+ * @param typeRef type reference of the target object
+ * @param <T> type of the target object
+ * @return deserialized Java object
+ */
+ public static <T> T fromJson(String json, TypeReference<T> typeRef) {
+ Validate.notNull(json, "null JSON string");
+ Validate.notNull(typeRef, "null type reference");
+ T object;
+ try {
+ object = MAPPER.readValue(json, typeRef);
+ } catch (IOException e) {
+ String errorMessage = "Failed to parse json: " + json;
+ LOG.error(errorMessage, e);
+ throw new SamzaException(errorMessage, e);
+ }
+ return object;
+ }
+
+ /**
+ * Serialize a Java object into JSON string.
+ * @param object object to be serialized
+ * @param <T> type of the input object
+ * @return JSON string
+ */
+ public static <T> String toJson(T object) {
+ Validate.notNull(object, "null input object");
+ StringWriter out = new StringWriter();
+ try {
+ MAPPER.writeValue(out, object);
+ } catch (IOException e) {
+ String errorMessage = "Failed to serialize object: " + object;
+ LOG.error(errorMessage, e);
+ throw new SamzaException(errorMessage, e);
+ }
+ return out.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java b/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java
new file mode 100644
index 0000000..2ef626c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/ReflectionUtils.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.lang.reflect.Constructor;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to simplify usage of Java reflection.
+ */
+public class ReflectionUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class);
+
+ private ReflectionUtils() {
+
+ }
+
+ /**
+ * Create an instance of the specified class with constuctor
+ * matching the argument array.
+ * @param clazz name of the class
+ * @param args argument array
+ * @param <T> type fo the class
+ * @return instance of the class, or null if anything went wrong
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T createInstance(String clazz, Object... args) {
+ Validate.notNull(clazz, "null class name");
+ try {
+ Class<T> classObj = (Class<T>) Class.forName(clazz);
+ Class<?>[] argTypes = new Class<?>[args.length];
+ IntStream.range(0, args.length).forEach(i -> argTypes[i] = args[i].getClass());
+ Constructor<T> ctor = classObj.getDeclaredConstructor(argTypes);
+ return ctor.newInstance(args);
+ } catch (Exception e) {
+ LOG.warn("Failed to create instance for: " + clazz, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
new file mode 100644
index 0000000..e81abfa
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -0,0 +1,196 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlAsOperator;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlUnnestOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.interfaces.SamzaSqlDriver;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+
+
+/**
+ * Utility class that is used to parse the Samza sql query to figure out the sources, sink etc..
+ */
+public class SamzaSqlQueryParser {
+
+ private SamzaSqlQueryParser() {
+ }
+
+ public static class QueryInfo {
+ private final List<String> sources;
+ private String selectQuery;
+ private String sink;
+ private String sql;
+
+ public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) {
+ this.selectQuery = selectQuery;
+ this.sink = sink;
+ this.sources = sources;
+ this.sql = sql;
+ }
+
+ public List<String> getSources() {
+ return sources;
+ }
+
+ public String getSelectQuery() {
+ return selectQuery;
+ }
+
+ public String getSink() {
+ return sink;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+ }
+
+ public static QueryInfo parseQuery(String sql) {
+
+ Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select .* from (.*))", Pattern.CASE_INSENSITIVE);
+ Matcher m = insertIntoSqlPattern.matcher(sql);
+ if (!m.matches()) {
+ throw new SamzaException("Invalid query format");
+ }
+
+ Planner planner = createPlanner();
+ SqlNode sqlNode;
+ try {
+ sqlNode = planner.parse(sql);
+ } catch (SqlParseException e) {
+ throw new SamzaException(e);
+ }
+
+ String sink;
+ String selectQuery;
+ ArrayList<String> sources;
+ if (sqlNode instanceof SqlInsert) {
+ SqlInsert sqlInsert = ((SqlInsert) sqlNode);
+ sink = sqlInsert.getTargetTable().toString();
+ if (sqlInsert.getSource() instanceof SqlSelect) {
+ SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
+ selectQuery = m.group(2);
+ sources = getSourcesFromSelectQuery(sqlSelect);
+ } else {
+ throw new SamzaException("Sql query is not of the expected format");
+ }
+ } else {
+ throw new SamzaException("Sql query is not of the expected format");
+ }
+
+ return new QueryInfo(selectQuery, sources, sink, sql);
+ }
+
+ private static Planner createPlanner() {
+ Connection connection;
+ SchemaPlus rootSchema;
+ try {
+ JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl();
+ SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory);
+ DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:"));
+ DriverManager.registerDriver(driver);
+ connection = driver.connect("jdbc:calcite:", new Properties());
+ CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ rootSchema = calciteConnection.getRootSchema();
+ } catch (SQLException e) {
+ throw new SamzaException(e);
+ }
+
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
+ .defaultSchema(rootSchema)
+ .operatorTable(SqlStdOperatorTable.instance())
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .costFactory(null)
+ .build();
+ return Frameworks.getPlanner(frameworkConfig);
+ }
+
+ private static ArrayList<String> getSourcesFromSelectQuery(SqlSelect sqlSelect) {
+ ArrayList<String> sources = new ArrayList<>();
+ getSource(sqlSelect.getFrom(), sources);
+ if (sources.size() < 1) {
+ throw new SamzaException("Unsupported query " + sqlSelect);
+ }
+
+ return sources;
+ }
+
+ private static void getSource(SqlNode node, ArrayList<String> sourceList) {
+ if (node instanceof SqlJoin) {
+ SqlJoin joinNode = (SqlJoin) node;
+ ArrayList<String> sourcesLeft = new ArrayList<>();
+ ArrayList<String> sourcesRight = new ArrayList<>();
+ getSource(joinNode.getLeft(), sourcesLeft);
+ getSource(joinNode.getRight(), sourcesRight);
+
+ sourceList.addAll(sourcesLeft);
+ sourceList.addAll(sourcesRight);
+ } else if (node instanceof SqlIdentifier) {
+ sourceList.add(node.toString());
+ } else if (node instanceof SqlBasicCall) {
+ SqlBasicCall basicCall = ((SqlBasicCall) node);
+ if (basicCall.getOperator() instanceof SqlAsOperator) {
+ getSource(basicCall.operand(0), sourceList);
+ } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
+ sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
+ }
+ } else if (node instanceof SqlSelect) {
+ getSource(((SqlSelect) node).getFrom(), sourceList);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
new file mode 100644
index 0000000..f996987
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
@@ -0,0 +1,103 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.util;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility to read the .sql file and parse out the various sql statements in the file.
+ * Right now Samza SQL
+ * Samza SQL supports a sql file with multiple SQL statements where each SQL statement can be spread across
+ * multiple lines.
+ * It supports sql comments where a line starts with "--".
+ * It cannot support multiple sql statements in a single line.
+ * All the empty lines are ignored
+ * All the SQL statements should start with "insert into".
+ *
+ * e.g. SQL File
+ * -- Sample comment
+ * insert into log.output1 select * from kafka.input1
+ *
+ * insert into log.output2
+ * select * from kafka.input2
+ *
+ * -- You may have empty lines in between a single query.
+ * insert into log.output3
+ *
+ * select * from kafka.input3
+ *
+ * -- Below line which contains multiple sql statements are not supported
+ * -- insert into log.output4 select * from kafka.input4 insert into log.output5 select * from kafka.input5
+ *
+ * -- Below SQL statement is not supported because it doesn't start with insert into
+ * -- select * from kafka.input6
+ */
+public class SqlFileParser {
+
+ private static final String INSERT_CMD = "insert";
+ private static final Logger LOG = LoggerFactory.getLogger(SqlFileParser.class);
+ private static final String SQL_COMMENT_PREFIX = "--";
+
+ private SqlFileParser() {
+ }
+
+ public static List<String> parseSqlFile(String fileName) {
+ Validate.notEmpty(fileName, "fileName cannot be empty.");
+ List<String> sqlLines;
+ try {
+ sqlLines = Files.lines(Paths.get(fileName)).collect(Collectors.toList());
+ } catch (IOException e) {
+ String msg = String.format("Unable to parse the sql file %s", fileName);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
+ List<String> sqlStmts = new ArrayList<>();
+ String lastStatement = "";
+ for (String sqlLine : sqlLines) {
+ String sql = sqlLine.trim();
+ if (sql.toLowerCase().startsWith(INSERT_CMD)) {
+ if (StringUtils.isNotEmpty(lastStatement)) {
+ sqlStmts.add(lastStatement);
+ }
+
+ lastStatement = sql;
+ } else if (StringUtils.isNotBlank(sql) && !sql.startsWith(SQL_COMMENT_PREFIX)) {
+ lastStatement = String.format("%s %s", lastStatement, sql);
+ }
+ }
+
+ if (!StringUtils.isWhitespace(lastStatement)) {
+ sqlStmts.add(lastStatement);
+ }
+ return sqlStmts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index 294eccd..8d2c588 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -30,9 +30,9 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.testutil.JsonUtil;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.JsonUtil;
+import org.apache.samza.sql.util.SamzaSqlQueryParser;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index ccab449..272ca4a 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -24,7 +24,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
deleted file mode 100644
index 018a733..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestArrayUdf.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.testutil;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.udfs.SamzaSqlUdf;
-import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
-import org.apache.samza.sql.udfs.ScalarUdf;
-
-
-@SamzaSqlUdf(name = "MyTestArray")
-public class MyTestArrayUdf implements ScalarUdf {
- @Override
- public void init(Config udfConfig) {
- }
-
- @SamzaSqlUdfMethod
- public List<String> execute(Object... args) {
- Integer value = (Integer) args[0];
- return IntStream.range(0, value).mapToObj(String::valueOf).collect(Collectors.toList());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
deleted file mode 100644
index 4241424..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/MyTestUdf.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.testutil;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.schema.SamzaSqlFieldType;
-import org.apache.samza.sql.udfs.SamzaSqlUdf;
-import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
-import org.apache.samza.sql.udfs.ScalarUdf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Test UDF used by unit and integration tests.
- */
-@SamzaSqlUdf(name = "MyTest")
-public class MyTestUdf implements ScalarUdf {
-
- private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
-
- @SamzaSqlUdfMethod
- public Integer execute(Object... value) {
- return ((Integer) value[0]) * 2;
- }
-
- @Override
- public void init(Config udfConfig) {
- LOG.info("Init called with {}", udfConfig);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
deleted file mode 100644
index 6ee9ee8..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/RemoteStoreIOResolverTestFactory.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
-import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
-
-
-public class RemoteStoreIOResolverTestFactory implements SqlIOResolverFactory {
- public static final String TEST_REMOTE_STORE_SYSTEM = "testRemoteStore";
- public static final String TEST_TABLE_ID = "testTableId";
-
- public static transient Map<Object, Object> records = new HashMap<>();
-
- @Override
- public SqlIOResolver create(Config config, Config fullConfig) {
- return new TestRemoteStoreIOResolver(config);
- }
-
- public static class InMemoryWriteFunction implements TableWriteFunction<Object, Object> {
-
- @Override
- public CompletableFuture<Void> putAsync(Object key, Object record) {
- records.put(key.toString(), record);
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(Object key) {
- records.remove(key.toString());
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- static class InMemoryReadFunction implements TableReadFunction<Object, Object> {
-
- @Override
- public CompletableFuture<Object> getAsync(Object key) {
- return CompletableFuture.completedFuture(records.get(key.toString()));
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- private class TestRemoteStoreIOResolver implements SqlIOResolver {
- private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
- private final Config config;
- private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
- private final String changeLogStorePrefix;
-
- public TestRemoteStoreIOResolver(Config config) {
- this.config = config;
- String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
- this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
- }
-
- private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
- String[] sourceComponents = ioName.split("\\.");
- int systemIdx = 0;
- int endIdx = sourceComponents.length - 1;
- int streamIdx = endIdx;
- TableDescriptor tableDescriptor = null;
-
- if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
- streamIdx = endIdx - 1;
-
- tableDescriptor = tableDescMap.get(ioName);
-
- if (tableDescriptor == null) {
- if (isSink) {
- tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
- .withReadFunction(new InMemoryReadFunction())
- .withWriteFunction(new InMemoryWriteFunction());
- } else if (sourceComponents[systemIdx].equals(TEST_REMOTE_STORE_SYSTEM)) {
- tableDescriptor = new RemoteTableDescriptor<>(TEST_TABLE_ID + "-" + ioName.replace(".", "-").replace("$", "-"))
- .withReadFunction(new InMemoryReadFunction());
- } else {
- // A local table
- String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
- SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
- (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
- SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
- (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
- tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled();
- }
- tableDescMap.put(ioName, tableDescriptor);
- }
- }
-
- Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
- return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
- Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
- }
-
- @Override
- public SqlIOConfig fetchSourceInfo(String sourceName) {
- return fetchIOInfo(sourceName, false);
- }
-
- @Override
- public SqlIOConfig fetchSinkInfo(String sinkName) {
- return fetchIOInfo(sinkName, true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
deleted file mode 100644
index 7c67082..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.sql.avro.AvroRelConverter;
-import org.apache.samza.sql.avro.AvroRelSchemaProvider;
-import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * SampleRelConverter is an {@link AvroRelConverter} which identifies alternate messages as system messages.
- * This is used purely for testing system messages.
- */
-public class SampleRelConverterFactory implements SamzaRelConverterFactory {
-
- private int i = 0;
-
- @Override
- public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config) {
- return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) relSchemaProvider, config);
- }
-
- public class SampleRelConverter extends AvroRelConverter {
- public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
- super(systemStream, schemaProvider, config);
- }
-
- @Override
- public boolean isSystemMessage(KV<Object, Object> kv) {
- // Return alternate ones as system messages.
- return (i++) % 2 == 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
deleted file mode 100644
index ba40ee5..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.stream.Collectors;
-import org.apache.samza.sql.SamzaSqlRelRecord;
-import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
-
-
-/**
- * A sample {@link SamzaRelTableKeyConverter} used in tests to convert the join key to table format.
- */
-public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
-
- @Override
- public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
- if (relRecord.getFieldValues().get(0) instanceof SamzaSqlRelRecord) {
- relRecord = (SamzaSqlRelRecord) relRecord.getFieldValues().get(0);
- }
- return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
deleted file mode 100644
index 2853ed4..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverterFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.HashMap;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
-import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverterFactory;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * A sample {@link SamzaRelTableKeyConverterFactory} used in tests to create {@link SampleRelTableKeyConverter}.
- */
-public class SampleRelTableKeyConverterFactory implements SamzaRelTableKeyConverterFactory {
-
- private final HashMap<SystemStream, SamzaRelTableKeyConverter> relConverters = new HashMap<>();
-
- @Override
- public SamzaRelTableKeyConverter create(SystemStream systemStream, Config config) {
- return relConverters.computeIfAbsent(systemStream, ss -> new SampleRelTableKeyConverter());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a1e85ee/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
deleted file mode 100644
index 82e57d5..0000000
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import com.google.common.base.Joiner;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.sql.avro.AvroRelConverterFactory;
-import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
-import org.apache.samza.sql.avro.schemas.Company;
-import org.apache.samza.sql.avro.schemas.ComplexRecord;
-import org.apache.samza.sql.avro.schemas.EnrichedPageView;
-import org.apache.samza.sql.avro.schemas.PageView;
-import org.apache.samza.sql.avro.schemas.PageViewCount;
-import org.apache.samza.sql.avro.schemas.Profile;
-import org.apache.samza.sql.avro.schemas.SimpleRecord;
-import org.apache.samza.sql.fn.BuildOutputRecordUdf;
-import org.apache.samza.sql.fn.FlattenUdf;
-import org.apache.samza.sql.fn.RegexMatchUdf;
-import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
-import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.system.TestAvroSystemFactory;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-
-import static org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory.TEST_REMOTE_STORE_SYSTEM;
-
-
-/**
- * Utility to hookup the configs needed to run the Samza Sql application.
- */
-public class SamzaSqlTestConfig {
-
- public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
- public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
- public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
-
- public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
- return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
- }
-
- public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) {
- return fetchStaticConfigsWithFactories(props, numberOfMessages, false);
- }
-
- public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
- boolean includeNullForeignKeys) {
- return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, false, 0);
- }
-
- public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
- boolean includeNullForeignKeys, boolean includeNullSimpleRecords) {
- return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, includeNullSimpleRecords, 0);
- }
-
- public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
- boolean includeNullForeignKeys, boolean includeNullSimpleRecords, long windowDurationMs) {
- HashMap<String, String> staticConfigs = new HashMap<>();
-
- staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
- staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
- staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
- staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
-
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
- String configIOResolverDomain =
- String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
- staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- RemoteStoreIOResolverTestFactory.class.getName());
-
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
- String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
- staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- ConfigBasedUdfResolver.class.getName());
- staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
- .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
- MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName()));
-
- String avroSystemConfigPrefix =
- String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
- String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
- staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
- staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
- String.valueOf(numberOfMessages));
- staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
- includeNullForeignKeys ? "true" : "false");
- staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_SIMPLE_RECORDS,
- includeNullSimpleRecords ? "true" : "false");
- staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
- String.valueOf(windowDurationMs / 2));
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
- staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
- staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
- String testRemoteStoreSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", TEST_REMOTE_STORE_SYSTEM);
- staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
- staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
- staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
- String avro2SystemConfigPrefix =
- String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2);
- String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2);
- staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
- staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
- String.valueOf(numberOfMessages));
- staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
- includeNullForeignKeys ? "true" : "false");
- staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
- String.valueOf(windowDurationMs / 2));
- staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
- staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
- staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
- String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
- staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
- staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
-
- String avroSamzaToRelMsgConverterDomain =
- String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
- staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- AvroRelConverterFactory.class.getName());
-
- String testRemoteStoreSamzaToRelMsgConverterDomain =
- String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, TEST_REMOTE_STORE_SYSTEM);
- staticConfigs.put(testRemoteStoreSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- AvroRelConverterFactory.class.getName());
-
- String testRemoteStoreSamzaRelTableKeyConverterDomain =
- String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN, "sample");
- staticConfigs.put(testRemoteStoreSamzaRelTableKeyConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- SampleRelTableKeyConverterFactory.class.getName());
-
- String configAvroRelSchemaProviderDomain =
- String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
- staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- ConfigBasedAvroRelSchemaProviderFactory.class.getName());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "SIMPLE3"), SimpleRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "Profile"), ComplexRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "PROFILE"), Profile.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "COMPANY"), Company.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- TEST_REMOTE_STORE_SYSTEM, "testTable"), SimpleRecord.SCHEMA$.toString());
-
- staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
- TEST_REMOTE_STORE_SYSTEM, "Profile"), Profile.SCHEMA$.toString());
-
- staticConfigs.putAll(props);
-
- return staticConfigs;
- }
-}