You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/12 23:05:25 UTC
samza git commit: SAMZA-948: Samza CoordinatorStreamSystemConsumer is
not thread-safe
Repository: samza
Updated Branches:
refs/heads/master a460a82e3 -> 023a7ce23
SAMZA-948: Samza CoordinatorStreamSystemConsumer is not thread-safe
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/023a7ce2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/023a7ce2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/023a7ce2
Branch: refs/heads/master
Commit: 023a7ce233631c14c40ee5241e13a298362d07b2
Parents: a460a82
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 12 11:07:49 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu May 12 11:07:49 2016 -0700
----------------------------------------------------------------------
.../stream/CoordinatorStreamSystemConsumer.java | 75 +++++++++++---------
.../samza/coordinator/JobCoordinator.scala | 2 +-
.../TestCoordinatorStreamSystemConsumer.java | 23 ------
3 files changed, 43 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 8e1057b..0a6661c 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -19,13 +19,13 @@
package org.apache.samza.coordinator.stream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -58,15 +58,16 @@ public class CoordinatorStreamSystemConsumer {
private final SystemConsumer systemConsumer;
private final SystemAdmin systemAdmin;
private final Map<String, String> configMap;
- private boolean isBootstrapped;
- private boolean isStarted;
- private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new LinkedHashSet<CoordinatorStreamMessage>();
+ private volatile boolean isStarted;
+ private volatile boolean isBootstrapped;
+ private final Object bootstrapLock = new Object();
+ private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet();
public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
this.systemConsumer = systemConsumer;
this.systemAdmin = systemAdmin;
- this.configMap = new HashMap<String, String>();
+ this.configMap = new HashMap();
this.isBootstrapped = false;
this.keySerde = keySerde;
this.messageSerde = messageSerde;
@@ -139,38 +140,46 @@ public class CoordinatorStreamSystemConsumer {
* Currently, this method only pays attention to config messages.
*/
public void bootstrap() {
- log.info("Bootstrapping configuration from coordinator stream.");
- SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+ synchronized (bootstrapLock) {
+ // Make a copy so readers aren't affected while we modify the set.
+ final LinkedHashSet<CoordinatorStreamMessage> bootstrappedMessages = new LinkedHashSet<>(bootstrappedStreamSet);
- try {
- while (iterator.hasNext()) {
- IncomingMessageEnvelope envelope = iterator.next();
- Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
- Map<String, Object> valueMap = null;
- if (envelope.getMessage() != null) {
- valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
- }
- CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
- log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
- // Remove any existing entry. Set.add() does not add if the element already exists.
- if (bootstrappedStreamSet.remove(coordinatorStreamMessage)) {
- log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
- }
- bootstrappedStreamSet.add(coordinatorStreamMessage);
- if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
- String configKey = coordinatorStreamMessage.getKey();
- if (coordinatorStreamMessage.isDelete()) {
- configMap.remove(configKey);
- } else {
- String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
- configMap.put(configKey, configValue);
+ log.info("Bootstrapping configuration from coordinator stream.");
+ SystemStreamPartitionIterator iterator =
+ new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+
+ try {
+ while (iterator.hasNext()) {
+ IncomingMessageEnvelope envelope = iterator.next();
+ Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
+ Map<String, Object> valueMap = null;
+ if (envelope.getMessage() != null) {
+ valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
+ }
+ CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
+ log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+ // Remove any existing entry. Set.add() does not add if the element already exists.
+ if (bootstrappedMessages.remove(coordinatorStreamMessage)) {
+ log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
+ }
+ bootstrappedMessages.add(coordinatorStreamMessage);
+ if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+ String configKey = coordinatorStreamMessage.getKey();
+ if (coordinatorStreamMessage.isDelete()) {
+ configMap.remove(configKey);
+ } else {
+ String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
+ configMap.put(configKey, configValue);
+ }
}
}
+
+ bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages);
+ log.debug("Bootstrapped configuration: {}", configMap);
+ isBootstrapped = true;
+ } catch (Exception e) {
+ throw new SamzaException(e);
}
- log.debug("Bootstrapped configuration: {}", configMap);
- isBootstrapped = true;
- } catch (Exception e) {
- throw new SamzaException(e);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 03f48db..bd7f3f5 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -165,8 +165,8 @@ object JobCoordinator extends Logging {
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
val grouper = getSystemStreamPartitionGrouper(config)
- info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
val groups = grouper.group(allSystemStreamPartitions)
+ info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet()))
// Initialize the ChangelogPartitionManager and the CheckpointManager
val previousChangelogMapping = if (changelogManager != null)
http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 9499027..03dcbb1 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -22,7 +22,6 @@ package org.apache.samza.coordinator.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -70,7 +69,6 @@ public class TestCoordinatorStreamSystemConsumer {
// Expected.
}
consumer.bootstrap();
- assertTrue(testOrder(consumer.getBoostrappedStream()));
assertEquals(expectedConfig, consumer.getConfig());
assertFalse(systemConsumer.isStopped());
consumer.stop();
@@ -94,27 +92,6 @@ public class TestCoordinatorStreamSystemConsumer {
assertEquals(1, systemConsumer.getRegisterCount());
}
- private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
- int initialSize = bootstrappedStreamSet.size();
- List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
- listStreamMessages.add(new SetConfig("order1", "job.name.order1", "my-order1-name"));
- listStreamMessages.add(new SetConfig("order2", "job.name.order2", "my-order2-name"));
- listStreamMessages.add(new SetConfig("order3", "job.name.order3", "my-order3-name"));
- bootstrappedStreamSet.addAll(listStreamMessages);
- Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator();
-
- for (int i = 0; i < initialSize; ++i) {
- iter.next();
- }
- int i = 0;
- while (iter.hasNext()) {
- if (!iter.next().getKey().equals(listStreamMessages.get(i++).getKey())) {
- return false;
- }
- }
- return true;
- }
-
/**
* Verify that if a particular key-value is written, then another, then the original again,
* that the original occurs last in the set.