You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/03 23:54:09 UTC

[samza] branch master updated: SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner (#1015)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fbca05  SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner (#1015)
1fbca05 is described below

commit 1fbca051956898cd7dc51fe0cf4e43acd5b51ddc
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Fri May 3 16:54:04 2019 -0700

    SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner (#1015)
    
    * SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner
---
 .../samza/system/inmemory/InMemoryManager.java     | 21 ++++++++++++++++
 .../system/inmemory/InMemorySystemProducer.java    | 15 ++++++++++++
 .../apache/samza/test/framework/TestRunner.java    | 10 +++++---
 .../test/framework/StreamTaskIntegrationTest.java  | 28 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index f46650f..2c2e9ae 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
@@ -76,6 +77,26 @@ class InMemoryManager {
   }
 
   /**
+   * Handles produce request from {@link InMemorySystemProducer} for case where a job has a custom IME and
+   * populates the underlying message queue with the IME.
+   * Note: Offset in the envelope needs to be in the increasing order for envelopes in the same ssp and needs to
+   * start at 0 for the first envelope, otherwise poll logic will be impacted
+   *
+   * @param ssp system stream partition
+   * @param envelope incoming message envelope
+   */
+  void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+    List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
+    String offset = String.valueOf(messages.size());
+    if (envelope.getOffset().equals(offset)) {
+      throw new SamzaException(
+          String.format("Offset mismatch for ssp %s, expected %s found %s, please set the correct offset", ssp, offset, envelope.getOffset()));
+    }
+    bufferedMessages.get(ssp).add(envelope);
+  }
+
+
+  /**
    * Handles the poll request from {@link InMemorySystemConsumer}. It uses the input offset as the starting offset for
    * each {@link SystemStreamPartition}.
    *
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
index 872488d..246a558 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.inmemory;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -96,6 +97,20 @@ public class InMemorySystemProducer implements SystemProducer {
   }
 
   /**
+   * Populates the IME to the ssp configured, this gives user more control to set up Test environment partition.
+   * The offset in the envelope needs to adhere to a rule that for messages in the same system stream partition the
+   * offset needs to start at 0 for the first and be monotonically increasing for the following messages.
+   * If not the {@link InMemoryManager#put(SystemStreamPartition, IncomingMessageEnvelope)} will fail.
+   *
+   * Note: Please DO NOT use this in production use cases, this is only meant to set-up more flexible tests.
+   * This function is not thread safe.
+   * @param envelope incoming message envelope
+   */
+  public void send(IncomingMessageEnvelope envelope) {
+    memoryManager.put(envelope.getSystemStreamPartition(), envelope);
+  }
+
+  /**
    * If the SystemProducer buffers messages before sending them to its underlying system, it should flush those
    * messages and leave no messages remaining to be sent.
    *
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index c2fbfa7..6737488 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -56,12 +56,12 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.descriptors.StreamDescriptor;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.system.inmemory.InMemorySystemProducer;
 import org.apache.samza.task.AsyncStreamTask;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
@@ -388,13 +388,17 @@ public class TestRunner {
     SystemFactory factory = new InMemorySystemFactory();
     Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
     factory.getAdmin(systemName, config).createStream(spec);
-    SystemProducer producer = factory.getProducer(systemName, config, null);
+    InMemorySystemProducer producer = (InMemorySystemProducer) factory.getProducer(systemName, config, null);
     SystemStream sysStream = new SystemStream(systemName, streamName);
     partitionData.forEach((partitionId, partition) -> {
         partition.forEach(e -> {
             Object key = e instanceof KV ? ((KV) e).getKey() : null;
             Object value = e instanceof KV ? ((KV) e).getValue() : e;
-            producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+            if (value instanceof IncomingMessageEnvelope) {
+              producer.send((IncomingMessageEnvelope) value);
+            } else {
+              producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+            }
           });
         producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
             new EndOfStreamMessage(null)));
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 1b57352..2a58273 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.context.Context;
@@ -40,6 +41,7 @@ import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
 import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
@@ -191,6 +193,32 @@ public class StreamTaskIntegrationTest {
     Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
     Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
     genData(inputPartitionData, expectedOutputPartitionData);
+    syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionData, expectedOutputPartitionData);
+  }
+
+  @Test
+  public void testSyncTaskWithMultiplePartitionMultithreadedWithCustomIME() throws Exception {
+    Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
+    Map<Integer, List<KV>> inputPartitionIME = new HashMap<>();
+    Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    genData(inputPartitionData, expectedOutputPartitionData);
+
+    for (Map.Entry<Integer, List<KV>> entry: inputPartitionData.entrySet()) {
+      Integer partitionId = entry.getKey();
+      List<KV> messages = entry.getValue();
+      SystemStreamPartition ssp = new SystemStreamPartition("test", "input", new Partition(partitionId));
+      inputPartitionIME.put(partitionId, new ArrayList<>());
+      int offset = 0;
+      for (KV message: messages) {
+        IncomingMessageEnvelope ime = new IncomingMessageEnvelope(ssp, String.valueOf(offset++), message.key, message.getValue());
+        inputPartitionIME.get(partitionId).add(KV.of(message.key, ime));
+      }
+    }
+    syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionData, expectedOutputPartitionData);
+  }
+
+  void syncTaskWithMultiplePartitionMultithreadedHelper(Map<Integer, List<KV>> inputPartitionData,
+      Map<Integer, List<Integer>> expectedOutputPartitionData) throws Exception {
 
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");