You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:22 UTC

[06/50] [abbrv] samza git commit: SAMZA-789: avoid re-register coordinator stream partition after the CoordinatorStreamSystemConsumer has started

SAMZA-789: avoid re-register coordinator stream partition after the CoordinatorStreamSystemConsumer has started

RB=583289
G=samza-reviewers
A=jmaes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9e1906af
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9e1906af
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9e1906af

Branch: refs/heads/samza-sql
Commit: 9e1906af377b6631842894cfcdfd8c90a18676ca
Parents: b14da28
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Tue Oct 6 18:13:44 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Thu Oct 8 23:22:04 2015 -0700

----------------------------------------------------------------------
 .../stream/CoordinatorStreamSystemConsumer.java |  4 +++
 .../TestCoordinatorStreamSystemConsumer.java    | 29 +++++++++++++++-----
 2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9e1906af/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 3113f09..e1a7626 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -81,6 +81,10 @@ public class CoordinatorStreamSystemConsumer {
    * coordinator stream with the SystemConsumer using the earliest offset.
    */
   public void register() {
+    if (isStarted) {
+      log.info("Coordinator stream partition {} has already been registered. Skipping.", coordinatorSystemStreamPartition);
+      return;
+    }
     log.debug("Attempting to register: {}", coordinatorSystemStreamPartition);
     Set<String> streamNames = new HashSet<String>();
     String streamName = coordinatorSystemStreamPartition.getStream();

http://git-wip-us.apache.org/repos/asf/samza/blob/9e1906af/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 370cfb7..0e73e18 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -52,9 +52,9 @@ public class TestCoordinatorStreamSystemConsumer {
     SystemStream systemStream = new SystemStream("system", "stream");
     MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
     CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
-    assertFalse(systemConsumer.isRegistered());
+    assertEquals(0, systemConsumer.getRegisterCount());
     consumer.register();
-    assertTrue(systemConsumer.isRegistered());
+    assertEquals(1, systemConsumer.getRegisterCount());
     assertFalse(systemConsumer.isStarted());
     consumer.start();
     assertTrue(systemConsumer.isStarted());
@@ -72,6 +72,23 @@ public class TestCoordinatorStreamSystemConsumer {
     assertTrue(systemConsumer.isStopped());
   }
 
+  @Test
+  public void testCoordinatorStreamSystemConsumerRegisterOnceOnly() throws Exception {
+    Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
+    expectedConfig.put("job.id", "1234");
+    SystemStream systemStream = new SystemStream("system", "stream");
+    MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+    assertEquals(0, systemConsumer.getRegisterCount());
+    consumer.register();
+    assertEquals(1, systemConsumer.getRegisterCount());
+    assertFalse(systemConsumer.isStarted());
+    consumer.start();
+    assertTrue(systemConsumer.isStarted());
+    consumer.register();
+    assertEquals(1, systemConsumer.getRegisterCount());
+  }
+
   private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
     int initialSize = bootstrappedStreamSet.size();
     List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
@@ -96,7 +113,7 @@ public class TestCoordinatorStreamSystemConsumer {
   private static class MockSystemConsumer implements SystemConsumer {
     private boolean started = false;
     private boolean stopped = false;
-    private boolean registered = false;
+    private int registerCount = 0;
     private final SystemStreamPartition expectedSystemStreamPartition;
     private int pollCount = 0;
 
@@ -113,13 +130,11 @@ public class TestCoordinatorStreamSystemConsumer {
     }
 
     public void register(SystemStreamPartition systemStreamPartition, String offset) {
-      registered = true;
+      registerCount++;
       assertEquals(expectedSystemStreamPartition, systemStreamPartition);
     }
 
-    public boolean isRegistered() {
-      return registered;
-    }
+    public int getRegisterCount() { return registerCount; }
 
     public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();