You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/06/15 22:26:44 UTC

samza git commit: SAMZA-699: fixed CoordinatorStreamMessages lose orders when being consumed.

Repository: samza
Updated Branches:
  refs/heads/master b1586413c -> 0fc6ad255


SAMZA-699: fixed CoordinatorStreamMessages lose orders when being consumed.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0fc6ad25
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0fc6ad25
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0fc6ad25

Branch: refs/heads/master
Commit: 0fc6ad255a48527cdcaa45b6bcc19f66ca9e08f9
Parents: b158641
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Mon Jun 15 13:26:06 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Jun 15 13:26:06 2015 -0700

----------------------------------------------------------------------
 .../stream/CoordinatorStreamSystemConsumer.java |  5 ++--
 .../TestCoordinatorStreamSystemConsumer.java    | 31 +++++++++++++++++---
 2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0fc6ad25/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 27e0750..b1078bd 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator.stream;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -57,7 +58,7 @@ public class CoordinatorStreamSystemConsumer {
   private final Map<String, String> configMap;
   private boolean isBootstrapped;
   private boolean isStarted;
-  private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new HashSet<CoordinatorStreamMessage>();
+  private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new LinkedHashSet<CoordinatorStreamMessage>();
 
   public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
@@ -172,7 +173,7 @@ public class CoordinatorStreamSystemConsumer {
   public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
     log.debug("Bootstrapping coordinator stream for messages of type {}", type);
     bootstrap();
-    HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>();
+    LinkedHashSet<CoordinatorStreamMessage> bootstrappedStream = new LinkedHashSet<CoordinatorStreamMessage>();
     for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
       if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
         bootstrappedStream.add(coordinatorStreamMessage);

http://git-wip-us.apache.org/repos/asf/samza/blob/0fc6ad25/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 2fe872b..c25f6a7 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -20,12 +20,13 @@
 package org.apache.samza.coordinator.stream;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,7 +44,7 @@ import org.junit.Test;
 public class TestCoordinatorStreamSystemConsumer {
   @Test
   public void testCoordinatorStreamSystemConsumer() {
-    Map<String, String> expectedConfig = new HashMap<String, String>();
+    Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
     expectedConfig.put("job.id", "1234");
     SystemStream systemStream = new SystemStream("system", "stream");
     MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
@@ -61,12 +62,34 @@ public class TestCoordinatorStreamSystemConsumer {
       // Expected.
     }
     consumer.bootstrap();
+    assertTrue(testOrder(consumer.getBoostrappedStream()));
     assertEquals(expectedConfig, consumer.getConfig());
     assertFalse(systemConsumer.isStopped());
     consumer.stop();
     assertTrue(systemConsumer.isStopped());
   }
 
+  private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
+    int initialSize = bootstrappedStreamSet.size();
+    List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
+    listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order1", "job.name.order1", "my-order1-name"));
+    listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order2", "job.name.order2", "my-order2-name"));
+    listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order3", "job.name.order3", "my-order3-name"));
+    bootstrappedStreamSet.addAll(listStreamMessages);
+    Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator();
+
+    for (int i = 0;  i < initialSize; ++i) {
+      iter.next();
+    }
+    int i = 0;
+    while (iter.hasNext()) {
+      if (!iter.next().getKey().equals(listStreamMessages.get(i++).getKey())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private static class MockSystemConsumer implements SystemConsumer {
     private boolean started = false;
     private boolean stopped = false;
@@ -96,7 +119,7 @@ public class TestCoordinatorStreamSystemConsumer {
     }
 
     public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
-      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
       assertEquals(1, systemStreamPartitions.size());
       SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
       assertEquals(expectedSystemStreamPartition, systemStreamPartition);