You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/03/25 19:53:15 UTC
samza git commit: SAMZA-913: fix CoordinatorSystemConsumer bootstrap
missing messages issue
Repository: samza
Updated Branches:
refs/heads/master a51f7b2b9 -> f02386464
SAMZA-913: fix CoordinatorSystemConsumer bootstrap missing messages issue
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f0238646
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f0238646
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f0238646
Branch: refs/heads/master
Commit: f02386464d31b5a496bb0578838f51a0331bfffa
Parents: a51f7b2
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri Mar 25 11:52:19 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Mar 25 11:52:19 2016 -0700
----------------------------------------------------------------------
.../stream/CoordinatorStreamSystemConsumer.java | 6 ++
.../TestCoordinatorStreamSystemConsumer.java | 63 ++++++++++++++++++--
2 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f0238646/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index e1a7626..8e1057b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -152,6 +152,10 @@ public class CoordinatorStreamSystemConsumer {
}
CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+ // Remove any existing entry. Set.add() does not add if the element already exists.
+ if (bootstrappedStreamSet.remove(coordinatorStreamMessage)) {
+ log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
+ }
bootstrappedStreamSet.add(coordinatorStreamMessage);
if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
String configKey = coordinatorStreamMessage.getKey();
@@ -182,7 +186,9 @@ public class CoordinatorStreamSystemConsumer {
bootstrap();
LinkedHashSet<CoordinatorStreamMessage> bootstrappedStream = new LinkedHashSet<CoordinatorStreamMessage>();
for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
+ log.trace("Considering message: {}", coordinatorStreamMessage);
if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+ log.trace("Adding message: {}", coordinatorStreamMessage);
bootstrappedStream.add(coordinatorStreamMessage);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f0238646/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 0e73e18..417772c 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -19,18 +19,14 @@
package org.apache.samza.coordinator.stream;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
@@ -44,6 +40,15 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestCoordinatorStreamSystemConsumer {
@Test
public void testCoordinatorStreamSystemConsumer() {
@@ -110,6 +115,42 @@ public class TestCoordinatorStreamSystemConsumer {
return true;
}
+ /**
+ * Verify that if a particular key-value is written, then another, then the original again,
+ * that the original occurs last in the set.
+ */
+ @Test
+ public void testOrderKeyRewrite() throws InterruptedException {
+ final SystemStream systemStream = new SystemStream("system", "stream");
+ final SystemStreamPartition ssp = new SystemStreamPartition(systemStream, new Partition(0));
+ final SystemConsumer systemConsumer = mock(SystemConsumer.class);
+
+ final List<IncomingMessageEnvelope> list = new ArrayList<>();
+ SetConfig setConfig1 = new SetConfig("source", "key1", "value1");
+ SetConfig setConfig2 = new SetConfig("source", "key1", "value2");
+ SetConfig setConfig3 = new SetConfig("source", "key1", "value1");
+ list.add(createIncomingMessageEnvelope(setConfig1, ssp));
+ list.add(createIncomingMessageEnvelope(setConfig2, ssp));
+ list.add(createIncomingMessageEnvelope(setConfig3, ssp));
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>() {
+ {
+ put(ssp, list);
+ }
+ };
+ when(systemConsumer.poll(anySet(), anyLong())).thenReturn(messages, Collections.<SystemStreamPartition, List<IncomingMessageEnvelope>>emptyMap());
+
+ CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+
+ consumer.bootstrap();
+
+ Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBoostrappedStream();
+
+ assertEquals(2, bootstrappedMessages.size()); // First message should have been removed as a duplicate
+ CoordinatorStreamMessage[] coordinatorStreamMessages = bootstrappedMessages.toArray(new CoordinatorStreamMessage[2]);
+ assertEquals(setConfig2, coordinatorStreamMessages[0]);
+ assertEquals(setConfig3, coordinatorStreamMessages[1]); //Config 3 MUST be the last message, not config 2
+ }
+
private static class MockSystemConsumer implements SystemConsumer {
private boolean started = false;
private boolean stopped = false;
@@ -172,4 +213,14 @@ public class TestCoordinatorStreamSystemConsumer {
return stopped;
}
}
+
+ private IncomingMessageEnvelope createIncomingMessageEnvelope(CoordinatorStreamMessage message, SystemStreamPartition ssp) {
+ try {
+ byte[] key = SamzaObjectMapper.getObjectMapper().writeValueAsString(message.getKeyArray()).getBytes("UTF-8");
+ byte[] value = SamzaObjectMapper.getObjectMapper().writeValueAsString(message.getMessageMap()).getBytes("UTF-8");
+ return new IncomingMessageEnvelope(ssp, null, key, value);
+ } catch (Exception e) {
+ return null;
+ }
+ }
}