You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/31 00:22:59 UTC
[samza] branch master updated: SAMZA-2029: Samza-SQL Diagnostics:
propagate eventTime and assign arrivalTime
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new efc0dca SAMZA-2029: Samza-SQL Diagnostics: propagate eventTime and assign arrivalTime
new fc23a1f Merge pull request #863 from shenodaguirguis/eventtime
efc0dca is described below
commit efc0dca50cd8e20cb22551a48f187e52bd6361eb
Author: Shenoda Guirguis <sg...@linkedin.com>
AuthorDate: Fri Dec 7 11:27:03 2018 -0800
SAMZA-2029: Samza-SQL Diagnostics: propagate eventTime and assign arrivalTime
---
.../samza/system/IncomingMessageEnvelope.java | 45 +++++++++++++---
.../descriptors/DelegatingSystemDescriptor.java | 12 ++++-
.../samza/system/inmemory/InMemoryManager.java | 4 +-
.../apache/samza/serializers/SerdeManager.scala | 5 +-
.../system/hdfs/reader/MultiFileHdfsReader.java | 3 +-
.../samza/system/kafka/KafkaConsumerProxy.java | 4 +-
.../kafka_deprecated/KafkaSystemConsumer.scala | 20 +++----
.../org/apache/samza/sql/SamzaSqlInputMessage.java | 61 ++++++++++++++++++++++
.../apache/samza/sql/SamzaSqlInputTransformer.java | 48 +++++++++++++++++
.../apache/samza/sql/data/SamzaSqlRelMessage.java | 13 ++++-
.../samza/sql/translator/QueryTranslator.java | 3 +-
.../samza/sql/translator/ScanTranslator.java | 54 ++++++++++++-------
12 files changed, 230 insertions(+), 42 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index c5aed31..ae8335e 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -20,6 +20,7 @@
package org.apache.samza.system;
import java.nio.charset.Charset;
+import java.time.Instant;
/**
* This class represents a message envelope that is received by a StreamTask for each message that is received from a
@@ -36,7 +37,10 @@ public class IncomingMessageEnvelope {
private final Object key;
private final Object message;
private final int size;
- private long timestamp = 0L;
+ // the timestamp when this event occured, should be set by the event source, 0 means unassigned
+ private long eventTime = 0L;
+ // the timestamp when this event is pickedup by samza, 0 means unassgined
+ private long arrivalTime = 0L;
/**
* Constructs a new IncomingMessageEnvelope from specified components.
@@ -66,14 +70,41 @@ public class IncomingMessageEnvelope {
this.key = key;
this.message = message;
this.size = size;
+ this.arrivalTime = Instant.now().toEpochMilli();
}
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
+ /**
+ * Constructs a new IncomingMessageEnvelope from specified components
+ * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
+ * from which the stream came, and the partition of the stream from which the message was received.
+ * @param offset The offset in the partition that the message was received from.
+ * @param key A deserialized key received from the partition offset.
+ * @param message A deserialized message received from the partition offset.
+ * @param size size of the message and key in bytes.
+ * @param eventTime the timestamp (in epochMillis) of when this event happened
+ * @param arrivalTime the timestamp (in epochMillis) of when this event arrived to (i.e., was picked-up by) Samza
+ */
+ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset,
+ Object key, Object message, int size, long eventTime, long arrivalTime) {
+ this(systemStreamPartition, offset, key, message, size);
+ this.eventTime = eventTime;
+ this.arrivalTime = arrivalTime;
}
- public long getTimestamp() {
- return timestamp;
+ /**
+ * Getter for event time
+ * @return this.eventTime
+ */
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ /**
+ * Getter for arrival time
+ * @return this.arrivalTime
+ */
+ public long getArrivalTime() {
+ return arrivalTime;
}
public SystemStreamPartition getSystemStreamPartition() {
@@ -159,6 +190,8 @@ public class IncomingMessageEnvelope {
@Override
public String toString() {
- return "IncomingMessageEnvelope [systemStreamPartition=" + systemStreamPartition + ", offset=" + offset + ", key=" + key + ", message=" + message + "]";
+ return "IncomingMessageEnvelope [systemStreamPartition=" + systemStreamPartition + ", offset=" + offset +
+ ", key=" + key + ", message=" + message + ", eventTime=" + eventTime +
+ ", arrivalTime=" + arrivalTime + "]";
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
index aa0f6a4..018b710 100644
--- a/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
@@ -18,9 +18,7 @@
*/
package org.apache.samza.system.descriptors;
-
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.samza.serializers.Serde;
/**
@@ -47,6 +45,16 @@ public final class DelegatingSystemDescriptor extends SystemDescriptor<Delegatin
super(systemName, null, null, null);
}
+ /**
+ * Constructs and {@link DelegatingSystemDescriptor} instance with given transformer and no system
+ * level Serde. Serdes are to be provided at stream level when getting input/output descriptors.
+ * @param systemName
+ * @param transformer
+ */
+ public DelegatingSystemDescriptor(String systemName, InputTransformer transformer) {
+ super(systemName, null, transformer, null);
+ }
+
@Override
public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
String streamId, Serde<StreamMessageType> serde) {
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 8463e56..f46650f 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.system.inmemory;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -68,7 +69,8 @@ class InMemoryManager {
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
}
- IncomingMessageEnvelope messageEnvelope = new IncomingMessageEnvelope(ssp, offset, key, message);
+ IncomingMessageEnvelope messageEnvelope = new IncomingMessageEnvelope(ssp, offset, key, message,
+ 0, 0L, Instant.now().toEpochMilli());
bufferedMessages.get(ssp)
.add(messageEnvelope);
}
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 60c325d..b6011f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -148,7 +148,10 @@ class SerdeManager(
envelope.getSystemStreamPartition,
envelope.getOffset,
key,
- message)
+ message,
+ envelope.getSize,
+ envelope.getEventTime(),
+ envelope.getArrivalTime)
}
}
}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
index 7870713..eea68bb 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -160,7 +160,8 @@ public class MultiFileHdfsReader {
IncomingMessageEnvelope messageEnvelope = curReader.readNext();
// Copy everything except for the offset. Turn the single-file style offset into a multi-file one
return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
- messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize());
+ messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize(),
+ messageEnvelope.getEventTime(), messageEnvelope.getArrivalTime());
}
/**
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index e47add7..0ea16e7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -21,6 +21,7 @@
package org.apache.samza.system.kafka;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -312,7 +313,8 @@ class KafkaConsumerProxy<K, V> {
K key = record.key();
Object value = record.value();
IncomingMessageEnvelope imEnvelope =
- new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
+ new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record),
+ record.timestamp(), Instant.now().toEpochMilli());
messages.add(imEnvelope);
}
if (LOG.isDebugEnabled()) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
index b7c4368..126765a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
@@ -19,6 +19,7 @@
package org.apache.samza.system.kafka_deprecated
+import java.time.Instant
import kafka.common.TopicAndPartition
import org.apache.samza.util.Logging
import kafka.message.Message
@@ -283,15 +284,16 @@ private[kafka_deprecated] class KafkaSystemConsumer(
null
}
- if(fetchLimitByBytesEnabled ) {
- val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
- ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
- put(systemStreamPartition, ime)
- } else {
- val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
- ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
- put(systemStreamPartition, ime)
- }
+ val eventTime = if (!msg.message.isNull) msg.message.timestamp else 0L
+ val arrivalTime = Instant.now().toEpochMilli()
+ val ime = if(fetchLimitByBytesEnabled ) {
+ new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message),
+ eventTime, arrivalTime)
+ } else {
+ new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, 0,
+ eventTime, arrivalTime)
+ }
+ put(systemStreamPartition, ime)
setIsAtHead(systemStreamPartition, isAtHead)
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputMessage.java
new file mode 100644
index 0000000..ad6706c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
+
+
+/**
+ * Represents the message transformed from IncomingMessageEnvelope by SamzaSqlInputTransformer
+ * it contains the event message and its metadata
+ */
+public class SamzaSqlInputMessage {
+ private final KV<Object, Object> keyAndMessageKV;
+ private final SamzaSqlRelMsgMetadata metadata;
+
+ /**
+ * Constructs a new SamzaSqMessage given the input arguments
+ * @param keyAndMessageKV
+ * @param metadata
+ */
+ private SamzaSqlInputMessage(KV<Object, Object> keyAndMessageKV, SamzaSqlRelMsgMetadata metadata) {
+ this.keyAndMessageKV = keyAndMessageKV;
+ this.metadata = metadata;
+ }
+
+ /**
+ * Constructs a new SamzaSqMessage given the input arguments
+ * @param keyAndMessageKV
+ * @param metadata
+ * @return
+ */
+ public static SamzaSqlInputMessage of (KV<Object, Object> keyAndMessageKV, SamzaSqlRelMsgMetadata metadata) {
+ return new SamzaSqlInputMessage(keyAndMessageKV, metadata);
+ }
+
+ public KV<Object, Object> getKeyAndMessageKV() {
+ return keyAndMessageKV;
+ }
+
+ public SamzaSqlRelMsgMetadata getMetadata() {
+ return metadata;
+ }
+}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputTransformer.java b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputTransformer.java
new file mode 100644
index 0000000..948dff2
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputTransformer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql;
+
+import java.time.Instant;
+import net.jodah.failsafe.internal.util.Assert;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.descriptors.InputTransformer;
+
+
+/**
+ * SamzaSqlInputTransformer:
+ * Input Transformer for SamzaSQL that consumes {@link IncomingMessageEnvelope} (IME) and produces
+ * {@link SamzaSqlInputMessage} so that the event metadata (currently eventTime and arrivalTime) are copied
+ * from the IME to the SamzaSQL layer through the {@link SamzaSqlInputMessage}
+ */
+
+public class SamzaSqlInputTransformer implements InputTransformer {
+
+ @Override
+ public Object apply(IncomingMessageEnvelope ime) {
+ Assert.notNull(ime, "ime is null");
+ KV<Object, Object> keyAndMessageKV = KV.of(ime.getKey(), ime.getMessage());
+ SamzaSqlRelMsgMetadata metadata = new SamzaSqlRelMsgMetadata(Instant.ofEpochMilli(ime.getEventTime()).toString(),
+ Instant.ofEpochMilli(ime.getArrivalTime()).toString(), null);
+ SamzaSqlInputMessage samzaMsg = SamzaSqlInputMessage.of(keyAndMessageKV, metadata);
+ return samzaMsg;
+ }
+}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index eb810d4..0ffc845 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -20,7 +20,6 @@
package org.apache.samza.sql.data;
import java.io.Serializable;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -127,6 +126,18 @@ public class SamzaSqlRelMessage implements Serializable {
return key;
}
+ public void setEventTime(String eventTime) {
+ this.samzaSqlRelMsgMetadata.setEventTime(eventTime);
+ }
+
+ public void setArrivalTime(String arrivalTime) {
+ this.samzaSqlRelMsgMetadata.setArrivalTime(arrivalTime);
+ }
+
+ public void setScanTime(String scanTime) {
+ this.samzaSqlRelMsgMetadata.setScanTime(scanTime);
+ }
+
@Override
public int hashCode() {
return Objects.hash(key, samzaSqlRelRecord);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 0fdfd39..d3c8fa9 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -53,6 +53,7 @@ import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.SamzaSqlInputMessage;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
@@ -79,7 +80,7 @@ public class QueryTranslator {
private final SamzaSqlApplicationConfig sqlConfig;
private final StreamApplicationDescriptor streamAppDescriptor;
private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
- private final Map<String, MessageStream<KV<Object, Object>>> inputMsgStreams;
+ private final Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams;
private final Map<String, OutputStream> outputMsgStreams;
private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
static int opId = 0;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 7f1ff39..e044f6f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -25,28 +25,29 @@ import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.core.TableScan;
import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.SamzaHistogram;
-import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.SamzaSqlInputTransformer;
+import org.apache.samza.sql.SamzaSqlInputMessage;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-
/**
* Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
* implementation
@@ -58,7 +59,7 @@ class ScanTranslator {
private final int queryId;
// FilterFunction to filter out any messages that are system specific.
- private static class FilterSystemMessageFunction implements FilterFunction<KV<Object, Object>> {
+ private static class FilterSystemMessageFunction implements FilterFunction<SamzaSqlInputMessage> {
private transient SamzaRelConverter relConverter;
private final String source;
private final int queryId;
@@ -76,8 +77,8 @@ class ScanTranslator {
}
@Override
- public boolean apply(KV<Object, Object> message) {
- return !relConverter.isSystemMessage(message);
+ public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) {
+ return !relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV());
}
}
@@ -91,7 +92,7 @@ class ScanTranslator {
* ScanMapFUnction implements MapFunction to process input SamzaSqlRelMessages into output
* SamzaSqlRelMessage, performing the table scan
*/
- private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> {
+ private static class ScanMapFunction implements MapFunction<SamzaSqlInputMessage, SamzaSqlRelMessage> {
// All the user-supplied functions are expected to be serializable in order to enable full serialization of user
// DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
// initialization.
@@ -125,10 +126,14 @@ class ScanTranslator {
}
@Override
- public SamzaSqlRelMessage apply(KV<Object, Object> message) {
+ public SamzaSqlRelMessage apply(SamzaSqlInputMessage samzaSqlInputMessage) {
Instant startProcessing = Instant.now();
- SamzaSqlRelMessage retMsg = this.msgConverter.convertToRelMessage(message);
- retMsg.getSamzaSqlRelMsgMetadata().setScanTime(startProcessing.toString());
+ /* SAMZA-2089/LISAMZA-10654: the SamzaRelConverter.convertToRelMessage currently does not initialize
+ * the samzaSqlRelMessage.samzaSqlRelMsgMetadata, this needs to be fixed */
+ SamzaSqlRelMessage retMsg = this.msgConverter.convertToRelMessage(samzaSqlInputMessage.getKeyAndMessageKV());
+ retMsg.setEventTime(samzaSqlInputMessage.getMetadata().getEventTime());
+ retMsg.setArrivalTime(samzaSqlInputMessage.getMetadata().getarrivalTime());
+ retMsg.setScanTime(startProcessing.toString());
updateMetrics(startProcessing, Instant.now());
return retMsg;
}
@@ -146,7 +151,7 @@ class ScanTranslator {
} // ScanMapFunction
void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context,
- Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<KV<Object, Object>>> inputMsgStreams) {
+ Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -169,15 +174,26 @@ class ScanTranslator {
return;
}
- KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
- DelegatingSystemDescriptor
- sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
- GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde);
+ // set the wrapper input transformer (SamzaSqlInputTransformer) in system descriptor
+ DelegatingSystemDescriptor systemDescriptor = systemDescriptors.get(systemName);
+ if (systemDescriptor == null) {
+ systemDescriptor = new DelegatingSystemDescriptor(systemName, new SamzaSqlInputTransformer());
+ systemDescriptors.put(systemName, systemDescriptor);
+ } else {
+ /* in SamzaSQL, there should be no systemDescriptor setup by user, so this branch happens only
+ * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
+ * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
+ if (systemDescriptor.getTransformer().isPresent()) {
+ InputTransformer existingTransformer = systemDescriptor.getTransformer().get();
+ if (!(existingTransformer instanceof SamzaSqlInputTransformer)) {
+ throw new SamzaException("SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
+ }
+ }
+ }
- MessageStream<KV<Object, Object>> inputStream =
- inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
+ InputDescriptor inputDescriptor = systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>());
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
- inputStream
+ inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(inputDescriptor))
.filter(new FilterSystemMessageFunction(sourceName, queryId))
.map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));