You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:22 UTC
[06/50] [abbrv] samza git commit: SAMZA-789: avoid re-register
coordinator stream partition after the CoordinatorStreamSystemConsumer has
started
SAMZA-789: avoid re-register coordinator stream partition after the CoordinatorStreamSystemConsumer has started
RB=583289
G=samza-reviewers
A=jmaes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9e1906af
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9e1906af
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9e1906af
Branch: refs/heads/samza-sql
Commit: 9e1906af377b6631842894cfcdfd8c90a18676ca
Parents: b14da28
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Tue Oct 6 18:13:44 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Thu Oct 8 23:22:04 2015 -0700
----------------------------------------------------------------------
.../stream/CoordinatorStreamSystemConsumer.java | 4 +++
.../TestCoordinatorStreamSystemConsumer.java | 29 +++++++++++++++-----
2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9e1906af/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 3113f09..e1a7626 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -81,6 +81,10 @@ public class CoordinatorStreamSystemConsumer {
* coordinator stream with the SystemConsumer using the earliest offset.
*/
public void register() {
+ if (isStarted) {
+ log.info("Coordinator stream partition {} has already been registered. Skipping.", coordinatorSystemStreamPartition);
+ return;
+ }
log.debug("Attempting to register: {}", coordinatorSystemStreamPartition);
Set<String> streamNames = new HashSet<String>();
String streamName = coordinatorSystemStreamPartition.getStream();
http://git-wip-us.apache.org/repos/asf/samza/blob/9e1906af/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 370cfb7..0e73e18 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -52,9 +52,9 @@ public class TestCoordinatorStreamSystemConsumer {
SystemStream systemStream = new SystemStream("system", "stream");
MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
- assertFalse(systemConsumer.isRegistered());
+ assertEquals(0, systemConsumer.getRegisterCount());
consumer.register();
- assertTrue(systemConsumer.isRegistered());
+ assertEquals(1, systemConsumer.getRegisterCount());
assertFalse(systemConsumer.isStarted());
consumer.start();
assertTrue(systemConsumer.isStarted());
@@ -72,6 +72,23 @@ public class TestCoordinatorStreamSystemConsumer {
assertTrue(systemConsumer.isStopped());
}
+ @Test
+ public void testCoordinatorStreamSystemConsumerRegisterOnceOnly() throws Exception {
+ Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
+ expectedConfig.put("job.id", "1234");
+ SystemStream systemStream = new SystemStream("system", "stream");
+ MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
+ CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+ assertEquals(0, systemConsumer.getRegisterCount());
+ consumer.register();
+ assertEquals(1, systemConsumer.getRegisterCount());
+ assertFalse(systemConsumer.isStarted());
+ consumer.start();
+ assertTrue(systemConsumer.isStarted());
+ consumer.register();
+ assertEquals(1, systemConsumer.getRegisterCount());
+ }
+
private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
int initialSize = bootstrappedStreamSet.size();
List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
@@ -96,7 +113,7 @@ public class TestCoordinatorStreamSystemConsumer {
private static class MockSystemConsumer implements SystemConsumer {
private boolean started = false;
private boolean stopped = false;
- private boolean registered = false;
+ private int registerCount = 0;
private final SystemStreamPartition expectedSystemStreamPartition;
private int pollCount = 0;
@@ -113,13 +130,11 @@ public class TestCoordinatorStreamSystemConsumer {
}
public void register(SystemStreamPartition systemStreamPartition, String offset) {
- registered = true;
+ registerCount++;
assertEquals(expectedSystemStreamPartition, systemStreamPartition);
}
- public boolean isRegistered() {
- return registered;
- }
+ public int getRegisterCount() { return registerCount; }
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();