You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/06/15 22:26:44 UTC
samza git commit: SAMZA-699: fixed CoordinatorStreamMessages lose
orders when being consumed.
Repository: samza
Updated Branches:
refs/heads/master b1586413c -> 0fc6ad255
SAMZA-699: fixed CoordinatorStreamMessages lose orders when being consumed.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0fc6ad25
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0fc6ad25
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0fc6ad25
Branch: refs/heads/master
Commit: 0fc6ad255a48527cdcaa45b6bcc19f66ca9e08f9
Parents: b158641
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Mon Jun 15 13:26:06 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Jun 15 13:26:06 2015 -0700
----------------------------------------------------------------------
.../stream/CoordinatorStreamSystemConsumer.java | 5 ++--
.../TestCoordinatorStreamSystemConsumer.java | 31 +++++++++++++++++---
2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0fc6ad25/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 27e0750..b1078bd 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator.stream;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -57,7 +58,7 @@ public class CoordinatorStreamSystemConsumer {
private final Map<String, String> configMap;
private boolean isBootstrapped;
private boolean isStarted;
- private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new HashSet<CoordinatorStreamMessage>();
+ private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new LinkedHashSet<CoordinatorStreamMessage>();
public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
@@ -172,7 +173,7 @@ public class CoordinatorStreamSystemConsumer {
public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
log.debug("Bootstrapping coordinator stream for messages of type {}", type);
bootstrap();
- HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>();
+ LinkedHashSet<CoordinatorStreamMessage> bootstrappedStream = new LinkedHashSet<CoordinatorStreamMessage>();
for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
bootstrappedStream.add(coordinatorStreamMessage);
http://git-wip-us.apache.org/repos/asf/samza/blob/0fc6ad25/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 2fe872b..c25f6a7 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -20,12 +20,13 @@
package org.apache.samza.coordinator.stream;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -43,7 +44,7 @@ import org.junit.Test;
public class TestCoordinatorStreamSystemConsumer {
@Test
public void testCoordinatorStreamSystemConsumer() {
- Map<String, String> expectedConfig = new HashMap<String, String>();
+ Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
expectedConfig.put("job.id", "1234");
SystemStream systemStream = new SystemStream("system", "stream");
MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
@@ -61,12 +62,34 @@ public class TestCoordinatorStreamSystemConsumer {
// Expected.
}
consumer.bootstrap();
+ assertTrue(testOrder(consumer.getBoostrappedStream()));
assertEquals(expectedConfig, consumer.getConfig());
assertFalse(systemConsumer.isStopped());
consumer.stop();
assertTrue(systemConsumer.isStopped());
}
+ private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
+ int initialSize = bootstrappedStreamSet.size();
+ List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
+ listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order1", "job.name.order1", "my-order1-name"));
+ listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order2", "job.name.order2", "my-order2-name"));
+ listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order3", "job.name.order3", "my-order3-name"));
+ bootstrappedStreamSet.addAll(listStreamMessages);
+ Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator();
+
+ for (int i = 0; i < initialSize; ++i) {
+ iter.next();
+ }
+ int i = 0;
+ while (iter.hasNext()) {
+ if (!iter.next().getKey().equals(listStreamMessages.get(i++).getKey())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private static class MockSystemConsumer implements SystemConsumer {
private boolean started = false;
private boolean stopped = false;
@@ -96,7 +119,7 @@ public class TestCoordinatorStreamSystemConsumer {
}
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
- Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
assertEquals(1, systemStreamPartitions.size());
SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
assertEquals(expectedSystemStreamPartition, systemStreamPartition);