You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/07 20:16:27 UTC

samza git commit: SAMZA-1477: Fix issues found by BEAM tests

Repository: samza
Updated Branches:
  refs/heads/master 2d1073249 -> 52d8ddd6e


SAMZA-1477: Fix issues found by BEAM tests

A bunch of issues were found by BEAM tests, which includes:

1) WatermarkFunction needs to be able to return output after processWatermark()
2) control message doesn't implement the equals() and hashcode()
3) Some kafka system related code is not scala 2.10 compatible for tests.

Author: xiliu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Prateek

Closes #345 from xinyuiscool/SAMZA-1477


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/52d8ddd6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/52d8ddd6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/52d8ddd6

Branch: refs/heads/master
Commit: 52d8ddd6e1edeff72c22602f76fc0f014453c0df
Parents: 2d10732
Author: Xinyu Liu <xi...@gmail.com>
Authored: Tue Nov 7 12:16:15 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Nov 7 12:16:15 2017 -0800

----------------------------------------------------------------------
 .../operators/functions/WatermarkFunction.java  |  7 ++-
 .../org/apache/samza/system/ControlMessage.java | 31 +++++++++++++
 .../apache/samza/system/WatermarkMessage.java   | 26 +++++++++++
 .../samza/operators/impl/OperatorImpl.java      | 49 ++++++++++++++++----
 .../operators/impl/PartitionByOperatorImpl.java |  4 +-
 .../samza/runtime/LocalApplicationRunner.java   |  2 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  8 ++--
 .../system/kafka/TestKafkaSystemAdmin.scala     |  2 +-
 8 files changed, 110 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
index 3be293e..97c8591 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
@@ -19,18 +19,21 @@
 
 package org.apache.samza.operators.functions;
 
+import java.util.Collection;
+
 /**
  * Allows handling of watermarks.
  */
-public interface WatermarkFunction {
+public interface WatermarkFunction<T> {
 
   /**
    * Processes the input watermark coming from upstream operators.
    * This allows custom watermark handling, such as triggering events or propagating it downstream.
    *
    * @param watermark input watermark
+   * @return output triggered after processing the watermark
    */
-  void processWatermark(long watermark);
+  Collection<T> processWatermark(long watermark);
 
   /**
    * Returns the output watermark. This function will be invoked immediately after either

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java b/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
index 4ec58b4..bcabd1c 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
@@ -43,4 +43,35 @@ public abstract class ControlMessage {
   public int getVersion() {
     return version;
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = Integer.hashCode(version);
+    result = prime * result + (taskName != null ? taskName.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+
+    final ControlMessage other = (ControlMessage) obj;
+    if (version != other.version) {
+      return false;
+    }
+
+    if (taskName != null
+        ? !taskName.equals(other.getTaskName())
+        : other.taskName != null) {
+      return false;
+    }
+
+    return true;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java b/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
index 7278c5c..ba41724 100644
--- a/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
+++ b/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
@@ -43,4 +43,30 @@ public class WatermarkMessage extends ControlMessage {
   public long getTimestamp() {
     return timestamp;
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = super.hashCode();
+    result = prime * result + Long.hashCode(timestamp);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    if (!super.equals(obj))
+      return false;
+
+    final WatermarkMessage other = (WatermarkMessage) obj;
+    if (timestamp != other.timestamp) {
+      return false;
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index f5a2624..96dcd89 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -23,6 +23,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
@@ -64,11 +66,16 @@ public abstract class OperatorImpl<M, RM> {
   private long inputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
   private long outputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
   private TaskName taskName;
+  // Although the operator node is in the operator graph, the current task may not consume any message in it.
+  // This can be caused by none of the input stream partitions of this op is assigned to the current task.
+  // It's important to know so we can populate the watermarks correctly.
+  private boolean usedInCurrentTask = false;
 
   Set<OperatorImpl<RM, ?>> registeredOperators;
   Set<OperatorImpl<?, M>> prevOperators;
   Set<SystemStream> inputStreams;
 
+  private TaskModel taskModel;
   // end-of-stream states
   private EndOfStreamStates eosStates;
   // watermark states
@@ -104,6 +111,9 @@ public abstract class OperatorImpl<M, RM> {
     TaskContextImpl taskContext = (TaskContextImpl) context;
     this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
     this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
+    ContainerModel containerModel = taskContext.getJobModel().getContainers()
+        .get(context.getSamzaContainerContext().id);
+    this.taskModel = containerModel.getTasks().get(taskName);
 
     handleInit(config, context);
 
@@ -139,6 +149,9 @@ public abstract class OperatorImpl<M, RM> {
 
   void registerInputStream(SystemStream input) {
     this.inputStreams.add(input);
+
+    usedInCurrentTask = usedInCurrentTask
+        || taskModel.getSystemStreamPartitions().stream().anyMatch(ssp -> ssp.getSystemStream().equals(input));
   }
 
   /**
@@ -320,15 +333,23 @@ public abstract class OperatorImpl<M, RM> {
       LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOpImplId());
 
       final Long outputWm;
-      WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+      final Collection<RM> output;
+      final WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
       if (watermarkFn != null) {
         // user-overrided watermark handling here
-        watermarkFn.processWatermark(inputWatermark);
+        output = (Collection<RM>) watermarkFn.processWatermark(inputWatermark);
         outputWm = watermarkFn.getOutputWatermark();
       } else {
         // use samza-provided watermark handling
         // default is to propagate the input watermark
-        outputWm = handleWatermark(inputWatermark, collector, coordinator);
+        output = handleWatermark(inputWatermark, collector, coordinator);
+        outputWm = getOutputWatermark();
+      }
+
+      if (!output.isEmpty()) {
+        output.forEach(rm ->
+            this.registeredOperators.forEach(op ->
+                op.onMessage(rm, collector, coordinator)));
       }
 
       propagateWatermark(outputWm, collector, coordinator);
@@ -357,9 +378,9 @@ public abstract class OperatorImpl<M, RM> {
    * @param coordinator task coordinator
    * @return output watermark, or null if the output watermark should not be updated.
    */
-  protected Long handleWatermark(long inputWatermark, MessageCollector collector, TaskCoordinator coordinator) {
-    // Default is no handling. Simply pass on the input watermark as output.
-    return inputWatermark;
+  protected Collection<RM> handleWatermark(long inputWatermark, MessageCollector collector, TaskCoordinator coordinator) {
+    // Default is no handling. Output is empty.
+    return Collections.emptyList();
   }
 
   /* package private for testing */
@@ -367,9 +388,19 @@ public abstract class OperatorImpl<M, RM> {
     return this.inputWatermark;
   }
 
-  /* package private for testing */
-  final long getOutputWatermark() {
-    return this.outputWatermark;
+  /**
+   * Returns the output watermark, default is the same as input.
+   * Operators which keep track of watermark should override this to return the current watermark.
+   * @return output watermark
+   */
+  protected long getOutputWatermark() {
+    if (usedInCurrentTask) {
+      // default as input
+      return getInputWatermark();
+    } else {
+      // always emit the max to indicate no input will be emitted afterwards
+      return Long.MAX_VALUE;
+    }
   }
 
   public void close() {

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 28b8dba..3811da8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -96,9 +96,9 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
+  protected Collection<Void> handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
     sendControlMessage(new WatermarkMessage(watermark, taskName), collector);
-    return watermark;
+    return Collections.emptyList();
   }
 
   private void sendControlMessage(ControlMessage message, MessageCollector collector) {

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index ff0299d..e9b6bc8 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -258,7 +258,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       Config config,
       StreamApplication app,
       StreamProcessorLifecycleListener listener) {
-    Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, this);
+    Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, new LocalApplicationRunner(config));
     if (taskFactory instanceof StreamTaskFactory) {
       return new StreamProcessor(
           config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index e033b53..3530713 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -193,8 +193,8 @@ private[kafka] class KafkaSystemConsumer(
 
         // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
         // This avoids trying to re-add the same topic partition repeatedly
-        def refresh(tp: List[TopicAndPartition]) = {
-          val head :: rest = tpToRefresh
+        def refresh() = {
+          val head = tpToRefresh.head
           // refreshBrokers can be called from abdicate and refreshDropped,
           // both of which are triggered from BrokerProxy threads. To prevent
           // accidentally creating multiple objects for the same broker, or
@@ -217,11 +217,11 @@ private[kafka] class KafkaSystemConsumer(
               case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head)
             }
           }
-          rest
+          tpToRefresh.tail
         }
 
         while (!tpToRefresh.isEmpty) {
-          tpToRefresh = refresh(tpToRefresh)
+          tpToRefresh = refresh()
         }
 
         loop.done

http://git-wip-us.apache.org/repos/asf/samza/blob/52d8ddd6/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 762e49e..65c43f5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -141,7 +141,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   }
 
   def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
-    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation)
+    new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
   }
 
 }