You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/12 23:05:25 UTC

samza git commit: SAMZA-948: Samza CoordinatorStreamSystemConsumer is not thread-safe

Repository: samza
Updated Branches:
  refs/heads/master a460a82e3 -> 023a7ce23


SAMZA-948: Samza CoordinatorStreamSystemConsumer is not thread-safe


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/023a7ce2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/023a7ce2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/023a7ce2

Branch: refs/heads/master
Commit: 023a7ce233631c14c40ee5241e13a298362d07b2
Parents: a460a82
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 12 11:07:49 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu May 12 11:07:49 2016 -0700

----------------------------------------------------------------------
 .../stream/CoordinatorStreamSystemConsumer.java | 75 +++++++++++---------
 .../samza/coordinator/JobCoordinator.scala      |  2 +-
 .../TestCoordinatorStreamSystemConsumer.java    | 23 ------
 3 files changed, 43 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 8e1057b..0a6661c 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -19,13 +19,13 @@
 
 package org.apache.samza.coordinator.stream;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -58,15 +58,16 @@ public class CoordinatorStreamSystemConsumer {
   private final SystemConsumer systemConsumer;
   private final SystemAdmin systemAdmin;
   private final Map<String, String> configMap;
-  private boolean isBootstrapped;
-  private boolean isStarted;
-  private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new LinkedHashSet<CoordinatorStreamMessage>();
+  private volatile boolean isStarted;
+  private volatile boolean isBootstrapped;
+  private final Object bootstrapLock = new Object();
+  private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet();
 
   public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     this.systemConsumer = systemConsumer;
     this.systemAdmin = systemAdmin;
-    this.configMap = new HashMap<String, String>();
+    this.configMap = new HashMap();
     this.isBootstrapped = false;
     this.keySerde = keySerde;
     this.messageSerde = messageSerde;
@@ -139,38 +140,46 @@ public class CoordinatorStreamSystemConsumer {
    * Currently, this method only pays attention to config messages.
    */
   public void bootstrap() {
-    log.info("Bootstrapping configuration from coordinator stream.");
-    SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+    synchronized (bootstrapLock) {
+      // Make a copy so readers aren't affected while we modify the set.
+      final LinkedHashSet<CoordinatorStreamMessage> bootstrappedMessages = new LinkedHashSet<>(bootstrappedStreamSet);
 
-    try {
-      while (iterator.hasNext()) {
-        IncomingMessageEnvelope envelope = iterator.next();
-        Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
-        Map<String, Object> valueMap = null;
-        if (envelope.getMessage() != null) {
-          valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
-        }
-        CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
-        log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
-        // Remove any existing entry. Set.add() does not add if the element already exists.
-        if (bootstrappedStreamSet.remove(coordinatorStreamMessage)) {
-          log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
-        }
-        bootstrappedStreamSet.add(coordinatorStreamMessage);
-        if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
-          String configKey = coordinatorStreamMessage.getKey();
-          if (coordinatorStreamMessage.isDelete()) {
-            configMap.remove(configKey);
-          } else {
-            String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
-            configMap.put(configKey, configValue);
+      log.info("Bootstrapping configuration from coordinator stream.");
+      SystemStreamPartitionIterator iterator =
+          new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+
+      try {
+        while (iterator.hasNext()) {
+          IncomingMessageEnvelope envelope = iterator.next();
+          Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
+          Map<String, Object> valueMap = null;
+          if (envelope.getMessage() != null) {
+            valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
+          }
+          CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
+          log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+          // Remove any existing entry. Set.add() does not add if the element already exists.
+          if (bootstrappedMessages.remove(coordinatorStreamMessage)) {
+            log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
+          }
+          bootstrappedMessages.add(coordinatorStreamMessage);
+          if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+            String configKey = coordinatorStreamMessage.getKey();
+            if (coordinatorStreamMessage.isDelete()) {
+              configMap.remove(configKey);
+            } else {
+              String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
+              configMap.put(configKey, configValue);
+            }
           }
         }
+
+        bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages);
+        log.debug("Bootstrapped configuration: {}", configMap);
+        isBootstrapped = true;
+      } catch (Exception e) {
+        throw new SamzaException(e);
       }
-      log.debug("Bootstrapped configuration: {}", configMap);
-      isBootstrapped = true;
-    } catch (Exception e) {
-      throw new SamzaException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 03f48db..bd7f3f5 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -165,8 +165,8 @@ object JobCoordinator extends Logging {
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
     val grouper = getSystemStreamPartitionGrouper(config)
-    info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
     val groups = grouper.group(allSystemStreamPartitions)
+    info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet()))
 
     // Initialize the ChangelogPartitionManager and the CheckpointManager
     val previousChangelogMapping = if (changelogManager != null)

http://git-wip-us.apache.org/repos/asf/samza/blob/023a7ce2/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 9499027..03dcbb1 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -22,7 +22,6 @@ package org.apache.samza.coordinator.stream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -70,7 +69,6 @@ public class TestCoordinatorStreamSystemConsumer {
       // Expected.
     }
     consumer.bootstrap();
-    assertTrue(testOrder(consumer.getBoostrappedStream()));
     assertEquals(expectedConfig, consumer.getConfig());
     assertFalse(systemConsumer.isStopped());
     consumer.stop();
@@ -94,27 +92,6 @@ public class TestCoordinatorStreamSystemConsumer {
     assertEquals(1, systemConsumer.getRegisterCount());
   }
 
-  private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
-    int initialSize = bootstrappedStreamSet.size();
-    List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
-    listStreamMessages.add(new SetConfig("order1", "job.name.order1", "my-order1-name"));
-    listStreamMessages.add(new SetConfig("order2", "job.name.order2", "my-order2-name"));
-    listStreamMessages.add(new SetConfig("order3", "job.name.order3", "my-order3-name"));
-    bootstrappedStreamSet.addAll(listStreamMessages);
-    Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator();
-
-    for (int i = 0;  i < initialSize; ++i) {
-      iter.next();
-    }
-    int i = 0;
-    while (iter.hasNext()) {
-      if (!iter.next().getKey().equals(listStreamMessages.get(i++).getKey())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   /**
    * Verify that if a particular key-value is written, then another, then the original again,
    * that the original occurs last in the set.