You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/03 23:54:09 UTC
[samza] branch master updated: SAMZA-2178: Utils to directly inject
custom IME to InMemorySystem streams via TestRunner (#1015)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 1fbca05 SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner (#1015)
1fbca05 is described below
commit 1fbca051956898cd7dc51fe0cf4e43acd5b51ddc
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Fri May 3 16:54:04 2019 -0700
SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner (#1015)
* SAMZA-2178: Utils to directly inject custom IME to InMemorySystem streams via TestRunner
---
.../samza/system/inmemory/InMemoryManager.java | 21 ++++++++++++++++
.../system/inmemory/InMemorySystemProducer.java | 15 ++++++++++++
.../apache/samza/test/framework/TestRunner.java | 10 +++++---
.../test/framework/StreamTaskIntegrationTest.java | 28 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index f46650f..2c2e9ae 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
@@ -76,6 +77,26 @@ class InMemoryManager {
}
/**
+ * Handles produce request from {@link InMemorySystemProducer} for case where a job has a custom IME and
+ * populates the underlying message queue with the IME.
+ * Note: Offset in the envelope needs to be in the increasing order for envelopes in the same ssp and needs to
+ * start at 0 for the first envelope, otherwise poll logic will be impacted
+ *
+ * @param ssp system stream partition
+ * @param envelope incoming message envelope
+ */
+ void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+ List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
+ String offset = String.valueOf(messages.size());
+ if (envelope.getOffset().equals(offset)) {
+ throw new SamzaException(
+ String.format("Offset mismatch for ssp %s, expected %s found %s, please set the correct offset", ssp, offset, envelope.getOffset()));
+ }
+ bufferedMessages.get(ssp).add(envelope);
+ }
+
+
+ /**
* Handles the poll request from {@link InMemorySystemConsumer}. It uses the input offset as the starting offset for
* each {@link SystemStreamPartition}.
*
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
index 872488d..246a558 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
@@ -21,6 +21,7 @@ package org.apache.samza.system.inmemory;
import com.google.common.base.Preconditions;
import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamPartition;
@@ -96,6 +97,20 @@ public class InMemorySystemProducer implements SystemProducer {
}
/**
+ * Populates the IME to the ssp configured, this gives user more control to set up Test environment partition.
+ * The offset in the envelope needs to adhere to a rule that for messages in the same system stream partition the
+ * offset needs to start at 0 for the first and be monotonically increasing for the following messages.
+ * If not the {@link InMemoryManager#put(SystemStreamPartition, IncomingMessageEnvelope)} will fail.
+ *
+ * Note: Please DO NOT use this in production use cases, this is only meant to set-up more flexible tests.
+ * This function is not thread safe.
+ * @param envelope incoming message envelope
+ */
+ public void send(IncomingMessageEnvelope envelope) {
+ memoryManager.put(envelope.getSystemStreamPartition(), envelope);
+ }
+
+ /**
* If the SystemProducer buffers messages before sending them to its underlying system, it should flush those
* messages and leave no messages remaining to be sent.
*
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index c2fbfa7..6737488 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -56,12 +56,12 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.descriptors.StreamDescriptor;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.system.inmemory.InMemorySystemProducer;
import org.apache.samza.task.AsyncStreamTask;
import org.apache.samza.task.StreamTask;
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
@@ -388,13 +388,17 @@ public class TestRunner {
SystemFactory factory = new InMemorySystemFactory();
Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
factory.getAdmin(systemName, config).createStream(spec);
- SystemProducer producer = factory.getProducer(systemName, config, null);
+ InMemorySystemProducer producer = (InMemorySystemProducer) factory.getProducer(systemName, config, null);
SystemStream sysStream = new SystemStream(systemName, streamName);
partitionData.forEach((partitionId, partition) -> {
partition.forEach(e -> {
Object key = e instanceof KV ? ((KV) e).getKey() : null;
Object value = e instanceof KV ? ((KV) e).getValue() : e;
- producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+ if (value instanceof IncomingMessageEnvelope) {
+ producer.send((IncomingMessageEnvelope) value);
+ } else {
+ producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+ }
});
producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
new EndOfStreamMessage(null)));
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 1b57352..2a58273 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.context.Context;
@@ -40,6 +41,7 @@ import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
@@ -191,6 +193,32 @@ public class StreamTaskIntegrationTest {
Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
genData(inputPartitionData, expectedOutputPartitionData);
+ syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionData, expectedOutputPartitionData);
+ }
+
+ @Test
+ public void testSyncTaskWithMultiplePartitionMultithreadedWithCustomIME() throws Exception {
+ Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
+ Map<Integer, List<KV>> inputPartitionIME = new HashMap<>();
+ Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+ genData(inputPartitionData, expectedOutputPartitionData);
+
+ for (Map.Entry<Integer, List<KV>> entry: inputPartitionData.entrySet()) {
+ Integer partitionId = entry.getKey();
+ List<KV> messages = entry.getValue();
+ SystemStreamPartition ssp = new SystemStreamPartition("test", "input", new Partition(partitionId));
+ inputPartitionIME.put(partitionId, new ArrayList<>());
+ int offset = 0;
+ for (KV message: messages) {
+ IncomingMessageEnvelope ime = new IncomingMessageEnvelope(ssp, String.valueOf(offset++), message.key, message.getValue());
+ inputPartitionIME.get(partitionId).add(KV.of(message.key, ime));
+ }
+ }
+ syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionData, expectedOutputPartitionData);
+ }
+
+ void syncTaskWithMultiplePartitionMultithreadedHelper(Map<Integer, List<KV>> inputPartitionData,
+ Map<Integer, List<Integer>> expectedOutputPartitionData) throws Exception {
InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");