You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/03/05 00:21:55 UTC
[samza] branch master updated: Propogating the system messages to
the stream. (#937)
This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3b8dc03 Propogating the system messages to the stream. (#937)
3b8dc03 is described below
commit 3b8dc03b22bf37aeee192190daf26653de5d61ea
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Mon Mar 4 16:21:51 2019 -0800
Propogating the system messages to the stream. (#937)
* Adding system messages to the stream
* checkstyle fixes
---
.../samza/sql/data/SamzaSqlRelMsgMetadata.java | 24 +++++-
.../sql/runner/SamzaSqlApplicationConfig.java | 8 ++
.../samza/sql/translator/ProjectTranslator.java | 1 -
.../samza/sql/translator/QueryTranslator.java | 12 +++
.../samza/sql/translator/ScanTranslator.java | 98 +++++++++++++++-------
.../TranslatorInputMetricsMapFunction.java | 3 +-
.../TranslatorOutputMetricsMapFunction.java | 2 +-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 25 +++++-
8 files changed, 133 insertions(+), 40 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
index 713ecbe..14f2892 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
@@ -20,6 +20,8 @@
package org.apache.samza.sql.data;
import java.io.Serializable;
+import java.time.Instant;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -36,9 +38,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
public boolean isNewInputMessage = true;
/**
- *
+ * Indicates whether the SamzaSqlMessage is a system message or not.
*/
- public String operatorBeginProcessingInstant = null;
+ @JsonIgnore
+ private boolean isSystemMessage = false;
+
+ /**
+ * Time at which the join operation started for the message.
+ * If there is no join node in the operator graph, this will be -1.
+ */
+ public long joinStartTimeMs = -1;
/**
@@ -93,7 +102,6 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
public boolean hasArrivalTime() { return arrivalTime != null && !arrivalTime.isEmpty(); }
-
@JsonProperty("scanTime")
public String getscanTime() { return scanTime;}
@@ -103,6 +111,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
public boolean hasScanTime() { return scanTime != null && !scanTime.isEmpty(); }
+ @JsonIgnore
+ public void setIsSystemMessage(boolean isSystemMessage) {
+ this.isSystemMessage = isSystemMessage;
+ }
+
+ @JsonIgnore
+ public boolean isSystemMessage() {
+ return isSystemMessage;
+ }
+
@Override
public String toString() {
return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime + "}]";
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 4883dfb..d521681 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -89,6 +89,7 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
+ public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS = "samza.sql.processSystemEvents";
public static final String SAMZA_SYSTEM_LOG = "log";
@@ -115,6 +116,7 @@ public class SamzaSqlApplicationConfig {
private final String metadataTopicPrefix;
private final long windowDurationMs;
+ private final boolean processSystemEvents;
public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
List<String> outputSystemStreams) {
@@ -165,6 +167,8 @@ public class SamzaSqlApplicationConfig {
metadataTopicPrefix =
staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+
+ processSystemEvents = staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true);
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
}
@@ -324,4 +328,8 @@ public class SamzaSqlApplicationConfig {
public long getWindowDurationMs() {
return windowDurationMs;
}
+
+ public boolean isProcessSystemEvents() {
+ return processSystemEvents;
+ }
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 3378788..6e6ff45 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -193,5 +193,4 @@ class ProjectTranslator {
context.registerMessageStream(project.getId(), outputStream);
context.registerRelNode(project.getId(), project);
}
-
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index d3c8fa9..ce4737c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -50,6 +50,7 @@ import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -293,6 +294,17 @@ public class QueryTranslator {
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
outputStream.sendTo(stm);
+
+ // Process system events only if the output is a stream.
+ if (sqlConfig.isProcessSystemEvents()) {
+ for( MessageStream<SamzaSqlInputMessage> inputStream : inputMsgStreams.values()) {
+ MessageStream<KV<Object, Object>> systemEventStream =
+ inputStream.filter(message -> message.getMetadata().isSystemMessage())
+ .map(SamzaSqlInputMessage::getKeyAndMessageKV);
+
+ systemEventStream.sendTo(stm);
+ }
+ }
} else {
Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index e044f6f..5fa04d8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.translator;
@@ -36,8 +36,8 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.SamzaSqlInputTransformer;
import org.apache.samza.sql.SamzaSqlInputMessage;
+import org.apache.samza.sql.SamzaSqlInputTransformer;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
@@ -48,6 +48,7 @@ import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+
/**
* Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
* implementation
@@ -78,7 +79,7 @@ class ScanTranslator {
@Override
public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) {
- return !relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV());
+ return !samzaSqlInputMessage.getMetadata().isSystemMessage();
}
}
@@ -147,11 +148,11 @@ class ScanTranslator {
queryInputEvents.inc();
processingTime.update(Duration.between(startProcessing, endProcessing).toMillis());
}
-
} // ScanMapFunction
- void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context,
- Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
+ void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId,
+ final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors,
+ Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -162,9 +163,9 @@ class ScanTranslator {
final String streamId = sqlIOConfig.getStreamId();
final String source = sqlIOConfig.getSource();
- final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
- (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
- sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor);
+ final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() && (
+ sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor || sqlIOConfig.getTableDescriptor()
+ .get() instanceof CachingTableDescriptor);
// For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
// SqlIOResolverFactory.
@@ -181,22 +182,55 @@ class ScanTranslator {
systemDescriptors.put(systemName, systemDescriptor);
} else {
/* in SamzaSQL, there should be no systemDescriptor setup by user, so this branch happens only
- * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
- * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
+ * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
+ * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
if (systemDescriptor.getTransformer().isPresent()) {
InputTransformer existingTransformer = systemDescriptor.getTransformer().get();
if (!(existingTransformer instanceof SamzaSqlInputTransformer)) {
- throw new SamzaException("SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
+ throw new SamzaException(
+ "SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
}
}
}
InputDescriptor inputDescriptor = systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>());
- MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
- inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(inputDescriptor))
- .filter(new FilterSystemMessageFunction(sourceName, queryId))
- .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
+
+ if (!inputMsgStreams.containsKey(source)) {
+ MessageStream<SamzaSqlInputMessage> inputMsgStream = streamAppDesc.getInputStream(inputDescriptor);
+ inputMsgStreams.put(source, inputMsgStream.map(new SystemMessageMapperFunction(source, queryId)));
+ }
+ MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputMsgStreams.get(source)
+ .filter(new FilterSystemMessageFunction(sourceName, queryId))
+ .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
}
+
+ /**
+ * Function that populates whether the message is a system message.
+ * TODO This should ideally be populated by the InputTransformer in future.
+ */
+ private static class SystemMessageMapperFunction implements MapFunction<SamzaSqlInputMessage, SamzaSqlInputMessage> {
+ private final String source;
+ private final int queryId;
+ private transient SamzaRelConverter relConverter;
+
+ public SystemMessageMapperFunction(String source, int queryId) {
+ this.source = source;
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void init(Context context) {
+ TranslatorContext translatorContext =
+ ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+ relConverter = translatorContext.getMsgConverter(source);
+ }
+
+ @Override
+ public SamzaSqlInputMessage apply(SamzaSqlInputMessage message) {
+ message.getMetadata().setIsSystemMessage(relConverter.isSystemMessage(message.getKeyAndMessageKV()));
+ return message;
+ }
+ }
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
index bb3300a..ef3028e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
@@ -19,7 +19,6 @@
package org.apache.samza.sql.translator;
-import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
@@ -60,7 +59,7 @@ class TranslatorInputMetricsMapFunction implements MapFunction<SamzaSqlRelMessag
@Override
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
inputEvents.inc();
- message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant = Instant.now().toString();
+ message.getSamzaSqlRelMsgMetadata().joinStartTimeMs = Instant.now().toEpochMilli();
return message;
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
index f1757fb..3e85bed 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
@@ -63,7 +63,7 @@ class TranslatorOutputMetricsMapFunction implements MapFunction<SamzaSqlRelMessa
@Override
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
Instant endProcessing = Instant.now();
- Instant beginProcessing = Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant);
+ Instant beginProcessing = Instant.ofEpochMilli(message.getSamzaSqlRelMsgMetadata().joinStartTimeMs);
outputEvents.inc();
processingTime.update(Duration.between(beginProcessing, endProcessing).toMillis());
return message;
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index e69ae9a..e0264ee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -107,6 +107,29 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
.map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
.sorted()
.collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ }
+
+ @Test
+ public void testEndToEndDisableSystemMessages() {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String avroSamzaToRelMsgConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+ staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ SampleRelConverterFactory.class.getName());
+ String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS, "false");
+ runApplication(new MapConfig(staticConfigs));
+
+ List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+ .sorted()
+ .collect(Collectors.toList());
Assert.assertEquals((numMessages + 1) / 2, outMessages.size());
}
@@ -174,7 +197,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
Assert.assertEquals(numMessages, outMessagesSet.size());
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
}
-
+
@Test
public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
int numMessages = 20;