You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/31 00:22:59 UTC

[samza] branch master updated: SAMZA-2029: Samza-SQL Diagnostics: propagate eventTime and assign arrivalTime

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new efc0dca  SAMZA-2029: Samza-SQL Diagnostics: propagate eventTime and assign arrivalTime
     new fc23a1f  Merge pull request #863 from shenodaguirguis/eventtime
efc0dca is described below

commit efc0dca50cd8e20cb22551a48f187e52bd6361eb
Author: Shenoda Guirguis <sg...@linkedin.com>
AuthorDate: Fri Dec 7 11:27:03 2018 -0800

    SAMZA-2029: Samza-SQL Diagnostics: propagate eventTime and assign arrivalTime
---
 .../samza/system/IncomingMessageEnvelope.java      | 45 +++++++++++++---
 .../descriptors/DelegatingSystemDescriptor.java    | 12 ++++-
 .../samza/system/inmemory/InMemoryManager.java     |  4 +-
 .../apache/samza/serializers/SerdeManager.scala    |  5 +-
 .../system/hdfs/reader/MultiFileHdfsReader.java    |  3 +-
 .../samza/system/kafka/KafkaConsumerProxy.java     |  4 +-
 .../kafka_deprecated/KafkaSystemConsumer.scala     | 20 +++----
 .../org/apache/samza/sql/SamzaSqlInputMessage.java | 61 ++++++++++++++++++++++
 .../apache/samza/sql/SamzaSqlInputTransformer.java | 48 +++++++++++++++++
 .../apache/samza/sql/data/SamzaSqlRelMessage.java  | 13 ++++-
 .../samza/sql/translator/QueryTranslator.java      |  3 +-
 .../samza/sql/translator/ScanTranslator.java       | 54 ++++++++++++-------
 12 files changed, 230 insertions(+), 42 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index c5aed31..ae8335e 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system;
 
 import java.nio.charset.Charset;
+import java.time.Instant;
 
 /**
  * This class represents a message envelope that is received by a StreamTask for each message that is received from a
@@ -36,7 +37,10 @@ public class IncomingMessageEnvelope {
   private final Object key;
   private final Object message;
   private final int size;
-  private long timestamp = 0L;
+  // the timestamp when this event occured, should be set by the event source, 0 means unassigned
+  private long eventTime = 0L;
+  // the timestamp when this event is pickedup by samza, 0 means unassgined
+  private long arrivalTime = 0L;
 
   /**
    * Constructs a new IncomingMessageEnvelope from specified components.
@@ -66,14 +70,41 @@ public class IncomingMessageEnvelope {
     this.key = key;
     this.message = message;
     this.size = size;
+    this.arrivalTime = Instant.now().toEpochMilli();
   }
 
-  public void setTimestamp(long timestamp) {
-    this.timestamp = timestamp;
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components
+   * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which the message was received.
+   * @param offset The offset in the partition that the message was received from.
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   * @param eventTime the timestamp (in epochMillis) of when this event happened
+   * @param arrivalTime the timestamp (in epochMillis) of when this event arrived to (i.e., was picked-up by) Samza
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset,
+      Object key, Object message, int size, long eventTime, long arrivalTime) {
+    this(systemStreamPartition, offset, key, message, size);
+    this.eventTime = eventTime;
+    this.arrivalTime = arrivalTime;
   }
 
-  public long getTimestamp() {
-    return timestamp;
+  /**
+   * Getter for event time
+   * @return this.eventTime
+   */
+  public long getEventTime() {
+    return eventTime;
+  }
+
+  /**
+   * Getter for arrival time
+   * @return this.arrivalTime
+   */
+  public long getArrivalTime() {
+    return arrivalTime;
   }
 
   public SystemStreamPartition getSystemStreamPartition() {
@@ -159,6 +190,8 @@ public class IncomingMessageEnvelope {
 
   @Override
   public String toString() {
-    return "IncomingMessageEnvelope [systemStreamPartition=" + systemStreamPartition + ", offset=" + offset + ", key=" + key + ", message=" + message + "]";
+    return "IncomingMessageEnvelope [systemStreamPartition=" + systemStreamPartition + ", offset=" + offset +
+        ", key=" + key + ", message=" + message + ", eventTime=" + eventTime +
+        ", arrivalTime=" + arrivalTime + "]";
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
index aa0f6a4..018b710 100644
--- a/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/system/descriptors/DelegatingSystemDescriptor.java
@@ -18,9 +18,7 @@
  */
 package org.apache.samza.system.descriptors;
 
-
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.samza.serializers.Serde;
 
 /**
@@ -47,6 +45,16 @@ public final class DelegatingSystemDescriptor extends SystemDescriptor<Delegatin
     super(systemName, null, null, null);
   }
 
+  /**
+   * Constructs and {@link DelegatingSystemDescriptor} instance with given transformer and no system
+   * level Serde. Serdes are to be provided at stream level when getting input/output descriptors.
+   * @param systemName
+   * @param transformer
+   */
+  public DelegatingSystemDescriptor(String systemName, InputTransformer transformer) {
+    super(systemName, null, transformer, null);
+  }
+
   @Override
   public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 8463e56..f46650f 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.system.inmemory;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -68,7 +69,8 @@ class InMemoryManager {
       offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
     }
 
-    IncomingMessageEnvelope messageEnvelope = new IncomingMessageEnvelope(ssp, offset, key, message);
+    IncomingMessageEnvelope messageEnvelope = new IncomingMessageEnvelope(ssp, offset, key, message,
+        0, 0L, Instant.now().toEpochMilli());
     bufferedMessages.get(ssp)
         .add(messageEnvelope);
   }
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 60c325d..b6011f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -148,7 +148,10 @@ class SerdeManager(
         envelope.getSystemStreamPartition,
         envelope.getOffset,
         key,
-        message)
+        message,
+        envelope.getSize,
+        envelope.getEventTime(),
+        envelope.getArrivalTime)
     }
   }
 }
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
index 7870713..eea68bb 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
@@ -160,7 +160,8 @@ public class MultiFileHdfsReader {
     IncomingMessageEnvelope messageEnvelope = curReader.readNext();
     // Copy everything except for the offset. Turn the single-file style offset into a multi-file one
     return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
-      messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize());
+      messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize(),
+      messageEnvelope.getEventTime(), messageEnvelope.getArrivalTime());
   }
 
   /**
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index e47add7..0ea16e7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -21,6 +21,7 @@
 
 package org.apache.samza.system.kafka;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -312,7 +313,8 @@ class KafkaConsumerProxy<K, V> {
       K key = record.key();
       Object value = record.value();
       IncomingMessageEnvelope imEnvelope =
-          new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
+          new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record),
+              record.timestamp(), Instant.now().toEpochMilli());
       messages.add(imEnvelope);
     }
     if (LOG.isDebugEnabled()) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
index b7c4368..126765a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.system.kafka_deprecated
 
+import java.time.Instant
 import kafka.common.TopicAndPartition
 import org.apache.samza.util.Logging
 import kafka.message.Message
@@ -283,15 +284,16 @@ private[kafka_deprecated] class KafkaSystemConsumer(
         null
       }
 
-      if(fetchLimitByBytesEnabled ) {
-        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
-        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
-        put(systemStreamPartition, ime)
-      } else {
-        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
-        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
-        put(systemStreamPartition, ime)
-      }
+      val eventTime = if (!msg.message.isNull) msg.message.timestamp else 0L
+      val arrivalTime = Instant.now().toEpochMilli()
+      val ime = if(fetchLimitByBytesEnabled ) {
+          new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message),
+          eventTime, arrivalTime)
+        } else {
+          new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, 0,
+            eventTime, arrivalTime)
+        }
+      put(systemStreamPartition, ime)
 
       setIsAtHead(systemStreamPartition, isAtHead)
     }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputMessage.java
new file mode 100644
index 0000000..ad6706c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
+
+
+/**
+ * Represents the message transformed from IncomingMessageEnvelope by SamzaSqlInputTransformer
+ * it contains the event message and its metadata
+ */
+public class SamzaSqlInputMessage {
+  private final KV<Object, Object> keyAndMessageKV;
+  private final SamzaSqlRelMsgMetadata metadata;
+
+  /**
+   * Constructs a new SamzaSqMessage given the input arguments
+   * @param keyAndMessageKV
+   * @param metadata
+   */
+  private SamzaSqlInputMessage(KV<Object, Object> keyAndMessageKV, SamzaSqlRelMsgMetadata metadata) {
+    this.keyAndMessageKV = keyAndMessageKV;
+    this.metadata = metadata;
+  }
+
+  /**
+   * Constructs a new SamzaSqMessage given the input arguments
+   * @param keyAndMessageKV
+   * @param metadata
+   * @return
+   */
+  public static SamzaSqlInputMessage of (KV<Object, Object> keyAndMessageKV, SamzaSqlRelMsgMetadata metadata) {
+    return new SamzaSqlInputMessage(keyAndMessageKV, metadata);
+  }
+
+  public KV<Object, Object> getKeyAndMessageKV() {
+    return keyAndMessageKV;
+  }
+
+  public SamzaSqlRelMsgMetadata getMetadata() {
+    return metadata;
+  }
+}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputTransformer.java b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputTransformer.java
new file mode 100644
index 0000000..948dff2
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/SamzaSqlInputTransformer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql;
+
+import java.time.Instant;
+import net.jodah.failsafe.internal.util.Assert;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.descriptors.InputTransformer;
+
+
+/**
+ * SamzaSqlInputTransformer:
+ *   Input Transformer for SamzaSQL that consumes {@link IncomingMessageEnvelope} (IME) and produces
+ *   {@link SamzaSqlInputMessage} so that the event metadata (currently eventTime and arrivalTime) are copied
+ *   from the IME to the SamzaSQL layer through the {@link SamzaSqlInputMessage}
+ */
+
+public class SamzaSqlInputTransformer implements InputTransformer {
+
+  @Override
+  public Object apply(IncomingMessageEnvelope ime) {
+    Assert.notNull(ime, "ime is null");
+    KV<Object, Object> keyAndMessageKV = KV.of(ime.getKey(), ime.getMessage());
+    SamzaSqlRelMsgMetadata metadata = new SamzaSqlRelMsgMetadata(Instant.ofEpochMilli(ime.getEventTime()).toString(),
+        Instant.ofEpochMilli(ime.getArrivalTime()).toString(), null);
+    SamzaSqlInputMessage samzaMsg = SamzaSqlInputMessage.of(keyAndMessageKV, metadata);
+    return  samzaMsg;
+  }
+}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index eb810d4..0ffc845 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -20,7 +20,6 @@
 package org.apache.samza.sql.data;
 
 import java.io.Serializable;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -127,6 +126,18 @@ public class SamzaSqlRelMessage implements Serializable {
     return key;
   }
 
+  public void setEventTime(String eventTime) {
+    this.samzaSqlRelMsgMetadata.setEventTime(eventTime);
+  }
+
+  public void setArrivalTime(String arrivalTime) {
+    this.samzaSqlRelMsgMetadata.setArrivalTime(arrivalTime);
+  }
+
+  public void setScanTime(String scanTime) {
+    this.samzaSqlRelMsgMetadata.setScanTime(scanTime);
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(key, samzaSqlRelRecord);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 0fdfd39..d3c8fa9 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -53,6 +53,7 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.SamzaSqlInputMessage;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
@@ -79,7 +80,7 @@ public class QueryTranslator {
   private final SamzaSqlApplicationConfig sqlConfig;
   private final StreamApplicationDescriptor streamAppDescriptor;
   private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
-  private final Map<String, MessageStream<KV<Object, Object>>> inputMsgStreams;
+  private final Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams;
   private final Map<String, OutputStream> outputMsgStreams;
   private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
   static int opId = 0;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 7f1ff39..e044f6f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -25,28 +25,29 @@ import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.SamzaHistogram;
-import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.SamzaSqlInputTransformer;
+import org.apache.samza.sql.SamzaSqlInputMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.InputTransformer;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
-
 /**
  * Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
  * implementation
@@ -58,7 +59,7 @@ class ScanTranslator {
   private final int queryId;
 
   // FilterFunction to filter out any messages that are system specific.
-  private static class FilterSystemMessageFunction implements FilterFunction<KV<Object, Object>> {
+  private static class FilterSystemMessageFunction implements FilterFunction<SamzaSqlInputMessage> {
     private transient SamzaRelConverter relConverter;
     private final String source;
     private final int queryId;
@@ -76,8 +77,8 @@ class ScanTranslator {
     }
 
     @Override
-    public boolean apply(KV<Object, Object> message) {
-      return !relConverter.isSystemMessage(message);
+    public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) {
+      return !relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV());
     }
   }
 
@@ -91,7 +92,7 @@ class ScanTranslator {
    * ScanMapFUnction implements MapFunction to process input SamzaSqlRelMessages into output
    * SamzaSqlRelMessage, performing the table scan
    */
-  private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> {
+  private static class ScanMapFunction implements MapFunction<SamzaSqlInputMessage, SamzaSqlRelMessage> {
     // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
     // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
     // initialization.
@@ -125,10 +126,14 @@ class ScanTranslator {
     }
 
     @Override
-    public SamzaSqlRelMessage apply(KV<Object, Object> message) {
+    public SamzaSqlRelMessage apply(SamzaSqlInputMessage samzaSqlInputMessage) {
       Instant startProcessing = Instant.now();
-      SamzaSqlRelMessage retMsg = this.msgConverter.convertToRelMessage(message);
-      retMsg.getSamzaSqlRelMsgMetadata().setScanTime(startProcessing.toString());
+      /* SAMZA-2089/LISAMZA-10654: the SamzaRelConverter.convertToRelMessage currently does not initialize
+       *                           the samzaSqlRelMessage.samzaSqlRelMsgMetadata, this needs to be fixed */
+      SamzaSqlRelMessage retMsg = this.msgConverter.convertToRelMessage(samzaSqlInputMessage.getKeyAndMessageKV());
+      retMsg.setEventTime(samzaSqlInputMessage.getMetadata().getEventTime());
+      retMsg.setArrivalTime(samzaSqlInputMessage.getMetadata().getarrivalTime());
+      retMsg.setScanTime(startProcessing.toString());
       updateMetrics(startProcessing, Instant.now());
       return retMsg;
     }
@@ -146,7 +151,7 @@ class ScanTranslator {
   } // ScanMapFunction
 
   void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context,
-      Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<KV<Object, Object>>> inputMsgStreams) {
+      Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
     StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
     String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -169,15 +174,26 @@ class ScanTranslator {
       return;
     }
 
-    KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
-    DelegatingSystemDescriptor
-        sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
-    GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde);
+    // set the wrapper input transformer (SamzaSqlInputTransformer) in system descriptor
+    DelegatingSystemDescriptor systemDescriptor = systemDescriptors.get(systemName);
+    if (systemDescriptor == null) {
+      systemDescriptor = new DelegatingSystemDescriptor(systemName, new SamzaSqlInputTransformer());
+      systemDescriptors.put(systemName, systemDescriptor);
+    } else {
+      /* in SamzaSQL, there should be no systemDescriptor setup by user, so this branch happens only
+      * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
+      * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
+      if (systemDescriptor.getTransformer().isPresent()) {
+        InputTransformer existingTransformer = systemDescriptor.getTransformer().get();
+        if (!(existingTransformer instanceof SamzaSqlInputTransformer)) {
+          throw new SamzaException("SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
+        }
+      }
+    }
 
-    MessageStream<KV<Object, Object>> inputStream =
-        inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
+    InputDescriptor inputDescriptor = systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>());
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
-        inputStream
+        inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(inputDescriptor))
             .filter(new FilterSystemMessageFunction(sourceName, queryId))
             .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));