You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/17 23:02:53 UTC
samza git commit: SAMZA-1651: Samza-sql - Implement GROUP BY SQL
operator
Repository: samza
Updated Branches:
refs/heads/master d7a071b34 -> aac6368a2
SAMZA-1651: Samza-sql - Implement GROUP BY SQL operator
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srini P<sp...@linkedin.com>
Closes #478 from atoomula/groupby1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/aac6368a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/aac6368a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/aac6368a
Branch: refs/heads/master
Commit: aac6368a2409764a98109109547a5af62860ff0f
Parents: d7a071b
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Apr 17 16:02:49 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Apr 17 16:02:49 2018 -0700
----------------------------------------------------------------------
.../sql/runner/SamzaSqlApplicationConfig.java | 13 +++
.../translator/LogicalAggregateTranslator.java | 94 ++++++++++++++++++++
.../samza/sql/translator/QueryTranslator.java | 17 +++-
.../apache/samza/sql/TestQueryTranslator.java | 43 +++++++++
.../samza/sql/avro/schemas/PageViewCount.avsc | 45 ++++++++++
.../samza/sql/avro/schemas/PageViewCount.java | 56 ++++++++++++
.../samza/sql/system/TestAvroSystemFactory.java | 51 ++++++++---
.../samza/sql/testutil/SamzaSqlTestConfig.java | 12 +++
.../test/samzasql/TestSamzaSqlEndToEnd.java | 64 +++++++++++--
9 files changed, 373 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index aeb7f35..bcefae2 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -77,6 +77,11 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
+
+ public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
+
+ private static final long DEFAULT_GROUPBY_WINDOW_DURATION_MS = 300000; // default groupby window duration is 5 mins.
+
private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
@@ -92,6 +97,8 @@ public class SamzaSqlApplicationConfig {
private final List<QueryInfo> queryInfo;
+ private final long windowDurationMs;
+
public SamzaSqlApplicationConfig(Config staticConfig) {
sql = fetchSqlFromConfig(staticConfig);
@@ -126,6 +133,8 @@ public class SamzaSqlApplicationConfig {
x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
relSchemaProvidersBySource.get(x.getSource()), c))));
+
+ windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
}
private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -244,4 +253,8 @@ public class SamzaSqlApplicationConfig {
public SourceResolver getSourceResolver() {
return sourceResolver;
}
+
+ public long getWindowDurationMs() {
+ return windowDurationMs;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
new file mode 100644
index 0000000..96179d6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.LongSerde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Translator to translate the LogicalAggregate node in the relational graph to the corresponding StreamGraph
+ * implementation
+ */
+class LogicalAggregateTranslator {
+
+ private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
+ private int windowId;
+
+ LogicalAggregateTranslator(int windowId) {
+ this.windowId = windowId;
+ }
+
+ void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
+ validateAggregateFunctions(aggregate);
+
+ MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(aggregate.getInput().getId());
+
+ // At this point, the assumption is that only count function is supported.
+ Supplier<Long> initialValue = () -> (long) 0;
+ FoldLeftFunction<SamzaSqlRelMessage, Long> foldCountFn = (m, c) -> c + 1;
+
+ MessageStream<SamzaSqlRelMessage> outputStream =
+ inputStream
+ .window(Windows.keyedTumblingWindow(m -> m,
+ Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
+ initialValue,
+ foldCountFn,
+ new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
+ new LongSerde())
+ .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow_" + windowId)
+ .map(windowPane -> {
+ List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
+ List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
+ fieldNames.add(aggregate.getAggCallList().get(0).getName());
+ fieldValues.add(windowPane.getMessage());
+ return new SamzaSqlRelMessage(fieldNames, fieldValues);
+ });
+ context.registerMessageStream(aggregate.getId(), outputStream);
+ }
+
+ void validateAggregateFunctions(final LogicalAggregate aggregate) {
+ if (aggregate.getAggCallList().size() != 1) {
+ String errMsg = "Windowing is supported ONLY with one aggregate function but the number of given functions are " +
+ aggregate.getAggCallList().size();
+ log.error(errMsg);
+ throw new SamzaException(errMsg);
+ }
+
+ if (aggregate.getAggCallList().get(0).getAggregation().getKind() != SqlKind.COUNT) {
+ String errMsg = "Windowing is supported ONLY with COUNT aggregate function";
+ log.error(errMsg);
+ throw new SamzaException(errMsg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index b853537..c8d55e8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
@@ -64,9 +65,11 @@ public class QueryTranslator {
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext);
final RelNode node = relRoot.project();
- final int[] joinId = new int[1];
node.accept(new RelShuttleImpl() {
+ int windowId = 0;
+ int joinId = 0;
+
@Override
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
@@ -91,9 +94,17 @@ public class QueryTranslator {
@Override
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
- joinId[0]++;
+ joinId++;
SourceResolver sourceResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getSourceResolver();
- new JoinTranslator(joinId[0], sourceResolver).translate(join, context);
+ new JoinTranslator(joinId, sourceResolver).translate(join, context);
+ return node;
+ }
+
+ @Override
+ public RelNode visit(LogicalAggregate aggregate) {
+ RelNode node = super.visit(aggregate);
+ windowId++;
+ new LogicalAggregateTranslator(windowId).translate(aggregate, context);
return node;
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 5309838..3365923 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -19,12 +19,14 @@
package org.apache.samza.sql;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
@@ -347,6 +349,7 @@ public class TestQueryTranslator {
StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
translator.translate(queryInfo, streamGraph);
}
+
@Test
public void testTranslateStreamTableInnerJoin() {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
@@ -464,4 +467,44 @@ public class TestQueryTranslator {
Assert.assertEquals("sql-job-1-partition_by-stream_1",
streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
}
+
+ @Test
+ public void testTranslateGroupBy() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.pageViewCountTopic"
+ + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
+ + " from testavro.PAGEVIEW as pv"
+ + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+ + " group by (pv.pageKey)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, streamGraph);
+
+ Assert.assertEquals(1, streamGraph.getInputOperators().size());
+ Assert.assertEquals(1, streamGraph.getOutputStreams().size());
+ Assert.assertTrue(streamGraph.hasWindowOrJoins());
+ Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs();
+ }
+
+ @Test (expected = SamzaException.class)
+ public void testTranslateGroupByWithSumAggregator() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
+ String sql =
+ "Insert into testavro.pageViewCountTopic"
+ + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`"
+ + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+ + " group by (pv.pageKey)";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+ QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
+ SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
+ StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig);
+ translator.translate(queryInfo, streamGraph);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.avsc
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.avsc
new file mode 100644
index 0000000..b124f6f
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.avsc
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+ "name": "PageViewCount",
+ "version" : 1,
+ "namespace": "org.apache.samza.sql.system.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "jobName",
+ "doc": "Job Name.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "pageKey",
+ "doc": "Page key.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "count",
+ "doc" : "Count of page views.",
+ "type": ["null", "int"],
+ "default":null
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.java
new file mode 100644
index 0000000..6b3d25d
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageViewCount.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class PageViewCount extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PageViewCount\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"jobName\",\"type\":[\"null\",\"string\"],\"doc\":\"Job Name.\",\"default\":null},{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"count\",\"type\":[\"null\",\"int\"],\"doc\":\"Count of page views.\",\"default\":null}]}");
+ /** Job Name. */
+ public java.lang.CharSequence jobName;
+ /** Page key. */
+ public java.lang.CharSequence pageKey;
+ /** Count of page views. */
+ public java.lang.Integer count;
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return jobName;
+ case 1: return pageKey;
+ case 2: return count;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: jobName = (java.lang.CharSequence)value$; break;
+ case 1: pageKey = (java.lang.CharSequence)value$; break;
+ case 2: count = (java.lang.Integer)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 4bd9741..9a45034 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -47,6 +47,7 @@ import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,7 @@ public class TestAvroSystemFactory implements SystemFactory {
public static final String CFG_NUM_MESSAGES = "numMessages";
public static final String CFG_INCLUDE_NULL_FOREIGN_KEYS = "includeNullForeignKeys";
- public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
+ public static final String CFG_SLEEP_BETWEEN_POLLS_MS = "sleepBetweenPollsMs";
private static final String[] profileNames = {"John", "Mike", "Mary", "Joe", "Brad", "Jennifer"};
private static final int[] profileZips = {94000, 94001, 94002, 94003, 94004, 94005};
@@ -65,17 +66,19 @@ public class TestAvroSystemFactory implements SystemFactory {
private static final String[] phoneNumbers = {"000-000-0000", "111-111-1111", "222-222-2222", "333-333-3333",
"444-444-4444", "555-555-5555"};
public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
- private static final String[] pagekeys = {"inbox", "home", "search", "pymk", "group", "job"};
+ public static final String[] pageKeys = {"inbox", "home", "search", "pymk", "group", "job"};
+
+ public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
public static List<String> getPageKeyProfileNameJoin(int numMessages) {
return IntStream.range(0, numMessages)
- .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % profileNames.length])
+ .mapToObj(i -> pageKeys[i % pageKeys.length] + "," + profileNames[i % profileNames.length])
.collect(Collectors.toList());
}
public static List<String> getPageKeyProfileNameAddressJoin(int numMessages) {
return IntStream.range(0, numMessages)
- .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % profileNames.length] + "," +
+ .mapToObj(i -> pageKeys[i % pageKeys.length] + "," + profileNames[i % profileNames.length] + "," +
profileZips[i % profileZips.length] + "," + streetNums[i % streetNums.length])
.collect(Collectors.toList());
}
@@ -83,24 +86,38 @@ public class TestAvroSystemFactory implements SystemFactory {
public static List<String> getPageKeyProfileNameJoinWithNullForeignKeys(int numMessages) {
// All even profileId foreign keys are null
return IntStream.range(0, numMessages / 2)
- .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + profileNames[(i * 2 + 1) % profileNames.length])
+ .mapToObj(i -> pageKeys[(i * 2 + 1) % pageKeys.length] + "," + profileNames[(i * 2 + 1) % profileNames.length])
.collect(Collectors.toList());
}
public static List<String> getPageKeyProfileNameOuterJoinWithNullForeignKeys(int numMessages) {
// All even profileId foreign keys are null
return IntStream.range(0, numMessages)
- .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? "null" : profileNames[i % profileNames.length]))
+ .mapToObj(i -> pageKeys[i % pageKeys.length] + "," + ((i % 2 == 0) ? "null" : profileNames[i % profileNames.length]))
.collect(Collectors.toList());
}
public static List<String> getPageKeyProfileCompanyNameJoin(int numMessages) {
return IntStream.range(0, numMessages)
- .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % profileNames.length] +
+ .mapToObj(i -> pageKeys[i % pageKeys.length] + "," + profileNames[i % profileNames.length] +
"," + companies[i % companies.length])
.collect(Collectors.toList());
}
+ public static HashMap<String, Integer> getPageKeyGroupByResult(int numMessages, Set<String> includePageKeys) {
+ HashMap<String, Integer> pageKeyCountMap = new HashMap<>();
+ int quotient = numMessages / pageKeys.length;
+ int remainder = numMessages % pageKeys.length;
+ IntStream.range(0, pageKeys.length)
+ .map(k -> {
+ if (includePageKeys.contains(pageKeys[k])) {
+ pageKeyCountMap.put(pageKeys[k], quotient + ((k < remainder) ? 1 : 0));
+ }
+ return k;
+ });
+ return pageKeyCountMap;
+ }
+
@Override
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
return new TestAvroSystemConsumer(systemName, config);
@@ -120,15 +137,18 @@ public class TestAvroSystemFactory implements SystemFactory {
public static final int DEFAULT_NUM_EVENTS = 10;
private final int numMessages;
private final boolean includeNullForeignKeys;
+ private final long sleepBetweenPollsMs;
private final Set<SystemStreamPartition> simpleRecordMap = new HashSet<>();
private final Set<SystemStreamPartition> profileRecordMap = new HashSet<>();
private final Set<SystemStreamPartition> companyRecordMap = new HashSet<>();
private final Set<SystemStreamPartition> pageViewRecordMap = new HashSet<>();
+ private final Map<SystemStreamPartition, Integer> curMessagesPerSsp = new HashMap<>();
public TestAvroSystemConsumer(String systemName, Config config) {
numMessages = config.getInt(String.format("systems.%s.%s", systemName, CFG_NUM_MESSAGES), DEFAULT_NUM_EVENTS);
includeNullForeignKeys = config.getBoolean(String.format("systems.%s.%s", systemName,
CFG_INCLUDE_NULL_FOREIGN_KEYS), false);
+ sleepBetweenPollsMs = config.getLong(String.format("systems.%s.%s", systemName, CFG_SLEEP_BETWEEN_POLLS_MS), 0);
}
@Override
@@ -153,6 +173,7 @@ public class TestAvroSystemFactory implements SystemFactory {
if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
pageViewRecordMap.add(systemStreamPartition);
}
+ curMessagesPerSsp.put(systemStreamPartition, 0);
}
@Override
@@ -160,13 +181,19 @@ public class TestAvroSystemFactory implements SystemFactory {
throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
set.forEach(ssp -> {
+ int curMessages = curMessagesPerSsp.get(ssp);
// We send num Messages and an end of stream message following that.
- List<IncomingMessageEnvelope> envelopes = IntStream.range(0, numMessages + 1)
- .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i,
- getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
- .collect(Collectors.toList());
+ List<IncomingMessageEnvelope> envelopes =
+ IntStream.range(curMessages, curMessages + numMessages/4)
+ .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i,
+ getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp))
+ .collect(Collectors.toList());
envelopeMap.put(ssp, envelopes);
+ curMessagesPerSsp.put(ssp, curMessages + numMessages/4);
});
+ if (sleepBetweenPollsMs > 0) {
+ Thread.sleep(sleepBetweenPollsMs);
+ }
return envelopeMap;
}
@@ -256,7 +283,7 @@ public class TestAvroSystemFactory implements SystemFactory {
GenericRecord record = new GenericData.Record(PageView.SCHEMA$);
// All even profileId foreign keys are null
record.put("profileId", includeNullForeignKeys && (index % 2 == 0) ? null : index);
- record.put("pageKey", pagekeys[index % pagekeys.length]);
+ record.put("pageKey", pageKeys[index % pageKeys.length]);
return record;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index b8b2814..208625d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -32,6 +32,7 @@ import org.apache.samza.sql.avro.schemas.Company;
import org.apache.samza.sql.avro.schemas.ComplexRecord;
import org.apache.samza.sql.avro.schemas.EnrichedPageView;
import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.PageViewCount;
import org.apache.samza.sql.avro.schemas.Profile;
import org.apache.samza.sql.avro.schemas.SimpleRecord;
import org.apache.samza.sql.fn.FlattenUdf;
@@ -62,6 +63,11 @@ public class SamzaSqlTestConfig {
public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
boolean includeNullForeignKeys) {
+ return fetchStaticConfigsWithFactories(props, numberOfMessages, includeNullForeignKeys, 0);
+ }
+
+ public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages,
+ boolean includeNullForeignKeys, long windowDurationMs) {
HashMap<String, String> staticConfigs = new HashMap<>();
staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
@@ -92,6 +98,9 @@ public class SamzaSqlTestConfig {
String.valueOf(numberOfMessages));
staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
includeNullForeignKeys ? "true" : "false");
+ staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+ String.valueOf(windowDurationMs / 2));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
@@ -129,6 +138,9 @@ public class SamzaSqlTestConfig {
staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
"testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString());
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
+
staticConfigs.putAll(props);
return staticConfigs;
http://git-wip-us.apache.org/repos/asf/samza/blob/aac6368a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index a41463e..d511a39 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -23,8 +23,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
@@ -41,6 +43,7 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,7 +200,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
.collect(Collectors.toList());
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
@@ -265,7 +268,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
.stream()
.filter(msg -> msg.endsWith("Mike"))
.collect(Collectors.toList());
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
@@ -294,7 +297,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
// Half the foreign keys are null.
Assert.assertEquals(numMessages / 2, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages);
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
@@ -323,7 +326,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages =
TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
@@ -352,7 +355,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages =
TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages);
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
@@ -382,7 +385,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
.collect(Collectors.toList());
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages);
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
@Test
@@ -413,7 +416,54 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
Assert.assertEquals(TestAvroSystemFactory.companies.length, outMessages.size());
List<String> expectedOutMessages =
TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(TestAvroSystemFactory.companies.length);
- Assert.assertEquals(outMessages, expectedOutMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
}
+ // Disabling the test until SAMZA-1652 and SAMZA-1661 are fixed.
+ @Ignore
+ @Test
+ public void testEndToEndGroupBy() throws Exception {
+ int numMessages = 200;
+ long windowDurationMs = 200;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
+ SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, false, windowDurationMs);
+ staticConfigs.putAll(configs);
+ String sql =
+ "Insert into testavro.pageViewCountTopic"
+ + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
+ + " from testavro.PAGEVIEW as pv"
+ + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
+ + " group by (pv.pageKey)";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.runAndWaitForFinish();
+
+ // Let's capture the list of windows/counts per key.
+ HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>();
+ TestAvroSystemFactory.messages.stream()
+ .map(x -> {
+ String pageKey = ((GenericRecord) x.getMessage()).get("pageKey").toString();
+ String count = ((GenericRecord) x.getMessage()).get("count").toString();
+ pageKeyCountListMap.computeIfAbsent(pageKey, k -> new ArrayList<>()).add(count);
+ return pageKeyCountListMap;
+ });
+
+ HashMap<String, Integer> pageKeyCountMap = new HashMap<>();
+ pageKeyCountListMap.forEach((key, list) -> {
+ // Check that the number of windows per key is non-zero but less than the number of input messages per key.
+ Assert.assertTrue(list.size() > 1 && list.size() < numMessages / TestAvroSystemFactory.pageKeys.length);
+ // Collapse the count of messages per key
+ pageKeyCountMap.put(key, list.stream().mapToInt(Integer::parseInt).sum());
+ });
+
+ Set<String> pageKeys = new HashSet<>(Arrays.asList("job", "inbox"));
+ HashMap<String, Integer> expectedPageKeyCountMap =
+ TestAvroSystemFactory.getPageKeyGroupByResult(numMessages, pageKeys);
+
+ Assert.assertEquals(expectedPageKeyCountMap, pageKeyCountMap);
+ }
}