You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/08 16:50:06 UTC
[samza] branch master updated: SAMZA-2179: Move the
StartpointVisitor abstraction to SystemAdmin interface. (#1016)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2d07cfd SAMZA-2179: Move the StartpointVisitor abstraction to SystemAdmin interface. (#1016)
2d07cfd is described below
commit 2d07cfdaf36a33aaeed08d13cdb48c28b4f30bc8
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Wed May 8 09:50:00 2019 -0700
SAMZA-2179: Move the StartpointVisitor abstraction to SystemAdmin interface. (#1016)
* Move StartpointVisitor to SystemAdmin.
---
.../org/apache/samza/startpoint/Startpoint.java | 15 +-
.../apache/samza/startpoint/StartpointCustom.java | 43 ----
.../apache/samza/startpoint/StartpointOldest.java | 6 +-
.../samza/startpoint/StartpointSpecific.java | 6 +-
.../samza/startpoint/StartpointTimestamp.java | 13 +-
.../samza/startpoint/StartpointUpcoming.java | 6 +-
.../apache/samza/startpoint/StartpointVisitor.java | 56 +++--
.../java/org/apache/samza/system/SystemAdmin.java | 11 +-
.../samza/startpoint/MockStartpointCustom.java | 50 ----
.../apache/samza/startpoint/TestStartpoint.java | 32 +--
.../samza/startpoint/TestStartpointManager.java | 31 +--
.../samza/startpoint/TestStartpointSerde.java | 12 -
.../samza/system/kafka/KafkaSystemAdmin.java | 251 +++++++++++++++------
.../samza/system/kafka/KafkaSystemConsumer.java | 199 ++++------------
.../scala/org/apache/samza/util/KafkaUtil.scala | 18 +-
.../system/kafka/TestKafkaSystemAdminJava.java | 102 ++++++++-
.../system/kafka/TestKafkaSystemConsumer.java | 199 +++-------------
17 files changed, 460 insertions(+), 590 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java b/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
index a65dacf..fcca792 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
@@ -22,13 +22,11 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.time.Instant;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStreamPartition;
-
/**
* Startpoint represents a position in a stream partition.
*/
-@InterfaceStability.Unstable
+@InterfaceStability.Evolving
public abstract class Startpoint {
private final long creationTimestamp;
@@ -50,12 +48,13 @@ public abstract class Startpoint {
}
/**
- * Apply the visitor {@link StartpointVisitor}'s register methods to the instance of this {@link Startpoint}
- * class.
- * @param systemStreamPartition The {@link SystemStreamPartition} needed to register with the {@link StartpointVisitor}
- * @param startpointVisitor The visitor to register with.
+ * Applies the {@link StartpointVisitor}'s visit methods to the {@link Startpoint}
+ * and returns the result of that operation.
+ * @param input the metadata associated with the startpoint.
+ * @param startpointVisitor the visitor of the startpoint.
+ * @return the result of applying the visitor on startpoint.
*/
- public abstract void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor);
+ public abstract <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor);
@Override
public String toString() {
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
deleted file mode 100644
index a52c974..0000000
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.startpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link Startpoint} that represents a custom startpoint. This is for systems that have a non-generic option
- * for setting offsets. Startpoints are serialized to JSON in the {@link org.apache.samza.metadatastore.MetadataStore}
- * and it is recommended to maintain the subclass of this {@link StartpointCustom} as a simple POJO.
- */
-public abstract class StartpointCustom extends Startpoint {
-
- StartpointCustom() {
- super();
- }
-
- StartpointCustom(long creationTimestamp) {
- super(creationTimestamp);
- }
-
- @Override
- public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
- startpointVisitor.visit(systemStreamPartition, this);
- }
-}
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
index 53872db..6f98723 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
@@ -19,8 +19,6 @@
package org.apache.samza.startpoint;
import com.google.common.base.MoreObjects;
-import org.apache.samza.system.SystemStreamPartition;
-
/**
* A {@link Startpoint} that represents the earliest offset in a stream partition.
@@ -35,8 +33,8 @@ public final class StartpointOldest extends Startpoint {
}
@Override
- public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
- startpointVisitor.visit(systemStreamPartition, this);
+ public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+ return startpointVisitor.visit(input, this);
}
@Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
index b544489..c56d8f0 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
@@ -20,8 +20,6 @@ package org.apache.samza.startpoint;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
-import org.apache.samza.system.SystemStreamPartition;
-
/**
* A {@link Startpoint} that represents a specific offset in a stream partition.
@@ -53,8 +51,8 @@ public final class StartpointSpecific extends Startpoint {
}
@Override
- public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
- startpointVisitor.visit(systemStreamPartition, this);
+ public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+ return startpointVisitor.visit(input, this);
}
@Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
index 609015c..3792403 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
@@ -18,10 +18,9 @@
*/
package org.apache.samza.startpoint;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
-import org.apache.samza.system.SystemStreamPartition;
-
/**
* A {@link Startpoint} that represents a timestamp offset in a stream partition.
@@ -44,6 +43,12 @@ public final class StartpointTimestamp extends Startpoint {
this.timestampOffset = timestampOffset;
}
+ @VisibleForTesting
+ StartpointTimestamp(Long timestampOffset, Long creationTimeStamp) {
+ super(creationTimeStamp);
+ this.timestampOffset = timestampOffset;
+ }
+
/**
* Getter for the timestamp offset.
* @return the timestamp offset in milliseconds.
@@ -53,8 +58,8 @@ public final class StartpointTimestamp extends Startpoint {
}
@Override
- public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
- startpointVisitor.visit(systemStreamPartition, this);
+ public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+ return startpointVisitor.visit(input, this);
}
@Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
index c477931..26af2ae 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
@@ -19,8 +19,6 @@
package org.apache.samza.startpoint;
import com.google.common.base.MoreObjects;
-import org.apache.samza.system.SystemStreamPartition;
-
/**
* A {@link Startpoint} that represents the latest offset in a stream partition.
@@ -35,8 +33,8 @@ public final class StartpointUpcoming extends Startpoint {
}
@Override
- public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
- startpointVisitor.visit(systemStreamPartition, this);
+ public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+ return startpointVisitor.visit(input, this);
}
@Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
index 99151ab..72b1552 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
@@ -18,54 +18,52 @@
*/
package org.apache.samza.startpoint;
-import org.apache.samza.system.SystemStreamPartition;
-
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.SystemAdmin;
/**
- * Visitor interface for system consumers to implement to support {@link Startpoint}s.
+ * A {@link SystemAdmin} implementation should implement this abstraction to support {@link Startpoint}.
*/
-public interface StartpointVisitor {
+@InterfaceStability.Evolving
+public interface StartpointVisitor<IN, OUT> {
/**
- * Seek to specific offset represented by {@link StartpointSpecific}
- * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
- * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
+ * Performs a sequence of operations using the {@link IN} and {@link StartpointSpecific} and returns the result of the execution.
+ * @param input the input metadata about the startpoint.
+ * @param startpointSpecific the {@link Startpoint} that represents the specific offset.
+ * @return the result of executing the operations defined by the visit method.
*/
- void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);
+ default OUT visit(IN input, StartpointSpecific startpointSpecific) {
+ throw new UnsupportedOperationException("StartpointSpecific is not supported.");
+ }
/**
- * Seek to timestamp offset represented by {@link StartpointTimestamp}
- * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
- * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
+ * Performs a sequence of operations using the {@link IN} and {@link StartpointTimestamp} and returns the result of the execution.
+ * @param input the input metadata about the startpoint.
+ * @param startpointTimestamp the {@link Startpoint} that represents the timestamp.
+ * @return the result of executing the operations defined by the visit method.
*/
- default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+ default OUT visit(IN input, StartpointTimestamp startpointTimestamp) {
throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
}
/**
- * Seek to earliest offset represented by {@link StartpointOldest}
- * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
- * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
+ * Performs a sequence of operations using the {@link IN} and {@link StartpointOldest} and returns the result of the execution.
+ * @param input the input metadata about the startpoint.
+ * @param startpointOldest the {@link Startpoint} that represents the earliest offset.
+ * @return the result of executing the operations defined by the visit method.
*/
- default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+ default OUT visit(IN input, StartpointOldest startpointOldest) {
throw new UnsupportedOperationException("StartpointOldest is not supported.");
}
/**
- * Seek to latest offset represented by {@link StartpointUpcoming}
- * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
- * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
+ * Performs a sequence of operations using the {@link IN} and {@link StartpointUpcoming} and returns the result of the execution.
+ * @param input the input metadata about the startpoint.
+ * @param startpointUpcoming the {@link Startpoint} that represents the latest offset.
+ * @return the result of executing the operations defined by the visit method.
*/
- default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+ default OUT visit(IN input, StartpointUpcoming startpointUpcoming) {
throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
}
-
- /**
- * Bootstrap signal represented by {@link StartpointCustom}
- * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
- * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
- */
- default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
- throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
- }
}
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8201b3d..1cd345e 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
+import org.apache.samza.startpoint.Startpoint;
/**
* Helper interface attached to an underlying system to fetch information about
@@ -164,4 +164,13 @@ public interface SystemAdmin {
throw new UnsupportedOperationException();
}
+ /**
+ * Resolves the startpoint to a system specific offset.
+ * @param startpoint represents the startpoint.
+ * @param systemStreamPartition represents the system stream partition.
+ * @return the resolved offset.
+ */
+ default String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java b/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
deleted file mode 100644
index de7b18a..0000000
--- a/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.startpoint;
-
-import java.time.Instant;
-
-
-public class MockStartpointCustom extends StartpointCustom {
- private final String testInfo1;
- private final long testInfo2;
-
- // Default constructor needed for serde.
- private MockStartpointCustom() {
- this(null, 0);
- }
-
- public MockStartpointCustom(String testInfo1, long testInfo2) {
- this(testInfo1, testInfo2, Instant.now().toEpochMilli());
- }
-
- public MockStartpointCustom(String testInfo1, long testInfo2, long creationTimestamp) {
- super(creationTimestamp);
- this.testInfo1 = testInfo1;
- this.testInfo2 = testInfo2;
- }
-
- public String getTestInfo1() {
- return testInfo1;
- }
-
- public long getTestInfo2() {
- return testInfo2;
- }
-}
diff --git a/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java b/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
index b54e569..a3fabee 100644
--- a/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
+++ b/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
@@ -24,7 +24,6 @@ import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Test;
-
public class TestStartpoint {
@Test
@@ -69,44 +68,31 @@ public class TestStartpoint {
Assert.assertEquals(StartpointUpcoming.class, mockStartpointVisitorConsumer.visitedClass);
}
- @Test
- public void testStartpointCustom() {
- MockStartpointCustom startpoint = new MockStartpointCustom("test12345", 12345);
- Assert.assertEquals("test12345", startpoint.getTestInfo1());
- Assert.assertEquals(12345, startpoint.getTestInfo2());
- Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
-
- MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
- startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
- Assert.assertEquals(MockStartpointCustom.class, mockStartpointVisitorConsumer.visitedClass);
- }
-
- static class MockStartpointVisitor implements StartpointVisitor {
+ static class MockStartpointVisitor implements StartpointVisitor<SystemStreamPartition, String> {
Class<? extends Startpoint> visitedClass;
@Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
visitedClass = startpointSpecific.getClass();
+ return null;
}
@Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
visitedClass = startpointTimestamp.getClass();
+ return null;
}
@Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
visitedClass = startpointOldest.getClass();
+ return null;
}
@Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
visitedClass = startpointUpcoming.getClass();
- }
-
- @Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
- visitedClass = startpointCustom.getClass();
+ return null;
}
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
index cc86882..b4c33c3 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -64,6 +64,22 @@ public class TestStartpointManager {
}
@Test
+ public void testStaleStartpoints() {
+ SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
+ TaskName taskName = new TaskName("MockTask");
+
+ startpointManager.start();
+ long staleTimestamp = Instant.now().toEpochMilli() - StartpointManager.DEFAULT_EXPIRATION_DURATION.toMillis() - 2;
+ StartpointTimestamp startpoint = new StartpointTimestamp(staleTimestamp, staleTimestamp);
+
+ startpointManager.writeStartpoint(ssp, startpoint);
+ Assert.assertNull(startpointManager.readStartpoint(ssp));
+
+ startpointManager.writeStartpoint(ssp, taskName, startpoint);
+ Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
+ }
+
+ @Test
public void testNoLongerUsableAfterStop() {
StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
startpointManager.start();
@@ -171,21 +187,6 @@ public class TestStartpointManager {
}
@Test
- public void testStaleStartpoints() {
- SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
- TaskName taskName = new TaskName("MockTask");
-
- startpointManager.start();
- long staleTimestamp = Instant.now().toEpochMilli() - StartpointManager.DEFAULT_EXPIRATION_DURATION.toMillis() - 2;
- MockStartpointCustom startpoint = new MockStartpointCustom("das boot", 42, staleTimestamp);
- startpointManager.writeStartpoint(ssp, startpoint);
- Assert.assertNull(startpointManager.readStartpoint(ssp));
-
- startpointManager.writeStartpoint(ssp, taskName, startpoint);
- Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
- }
-
- @Test
public void testGroupStartpointsPerTask() {
MapConfig config = new MapConfig();
startpointManager.start();
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
index c939680..9dfb784 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
@@ -21,7 +21,6 @@ package org.apache.samza.startpoint;
import org.junit.Assert;
import org.junit.Test;
-
public class TestStartpointSerde {
private final StartpointSerde startpointSerde = new StartpointSerde();
@@ -62,15 +61,4 @@ public class TestStartpointSerde {
Assert.assertEquals(startpointUpcoming.getClass(), startpointFromSerde.getClass());
Assert.assertEquals(startpointUpcoming.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
}
-
- @Test
- public void testStartpointCustomSerde() {
- MockStartpointCustom startpointCustom = new MockStartpointCustom("das boot", 42);
- Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointCustom));
-
- Assert.assertEquals(startpointCustom.getClass(), startpointFromSerde.getClass());
- Assert.assertEquals(startpointCustom.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
- Assert.assertEquals(startpointCustom.getTestInfo1(), ((MockStartpointCustom) startpointFromSerde).getTestInfo1());
- Assert.assertEquals(startpointCustom.getTestInfo2(), ((MockStartpointCustom) startpointFromSerde).getTestInfo2());
- }
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index a82a6b9..c18c82d 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -20,12 +20,15 @@
package org.apache.samza.system.kafka;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -43,6 +46,7 @@ import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
@@ -55,6 +59,12 @@ import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.SystemConfig;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
@@ -62,6 +72,7 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.KafkaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
@@ -90,7 +101,6 @@ public class KafkaSystemAdmin implements SystemAdmin {
public static volatile boolean deleteMessageCalled = false;
protected final String systemName;
- private final Consumer metadataConsumer; // Need to synchronize all accesses, since KafkaConsumer is not thread-safe
protected final Config config;
// Custom properties to create a new coordinator stream.
@@ -112,6 +122,8 @@ public class KafkaSystemAdmin implements SystemAdmin {
final AdminClient adminClient;
private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final ThreadSafeKafkaConsumer threadSafeKafkaConsumer;
+ private final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver;
public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
this.systemName = systemName;
@@ -121,7 +133,8 @@ public class KafkaSystemAdmin implements SystemAdmin {
throw new SamzaException(
"Cannot construct KafkaSystemAdmin for system " + systemName + " with null metadataConsumer");
}
- this.metadataConsumer = metadataConsumer;
+ this.threadSafeKafkaConsumer = new ThreadSafeKafkaConsumer(metadataConsumer);
+ this.kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(threadSafeKafkaConsumer);
Properties props = createAdminClientProperties();
LOG.info("New admin client with props:" + props);
@@ -176,9 +189,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
public void stop() {
if (stopped.compareAndSet(false, true)) {
try {
- metadataConsumer.close();
+ threadSafeKafkaConsumer.close();
} catch (Exception e) {
- LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
+ LOG.warn(String.format("Exception occurred when closing consumer of system: %s.", systemName), e);
}
}
@@ -231,10 +244,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
streamNames.forEach(streamName -> {
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
- List<PartitionInfo> partitionInfos;
- synchronized (metadataConsumer) {
- partitionInfos = metadataConsumer.partitionsFor(streamName);
- }
+ List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
partitionInfos.forEach(
partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
@@ -364,39 +374,29 @@ public class KafkaSystemAdmin implements SystemAdmin {
}
/**
- * Convert TopicPartition to SystemStreamPartition
- * @param topicPartition the topic partition to be created
- * @return an instance of SystemStreamPartition
- */
- private SystemStreamPartition toSystemStreamPartition(TopicPartition topicPartition) {
- String topic = topicPartition.topic();
- Partition partition = new Partition(topicPartition.partition());
- return new SystemStreamPartition(systemName, topic, partition);
- }
-
- /**
- * Uses {@code metadataConsumer} to fetch the metadata for the {@code topicPartitions}.
- * Warning: If multiple threads call this with the same {@code metadataConsumer}, then this will not protect against
- * concurrent access to the {@code metadataConsumer}.
+ * Uses the kafka consumer to fetch the metadata for the {@code topicPartitions}.
*/
private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> topicPartitions) {
Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> newestOffsets = new HashMap<>();
Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>();
- Map<TopicPartition, Long> oldestOffsetsWithLong;
- Map<TopicPartition, Long> upcomingOffsetsWithLong;
-
- synchronized (metadataConsumer) {
- oldestOffsetsWithLong = metadataConsumer.beginningOffsets(topicPartitions);
- LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
- upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
- LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
- }
+ final Map<TopicPartition, Long> oldestOffsetsWithLong = new HashMap<>();
+ final Map<TopicPartition, Long> upcomingOffsetsWithLong = new HashMap<>();
+
+ threadSafeKafkaConsumer.execute(consumer -> {
+ Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
+ LOG.debug("Beginning offsets for topic-partitions: {} is {}", topicPartitions, beginningOffsets);
+ oldestOffsetsWithLong.putAll(beginningOffsets);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+ LOG.debug("End offsets for topic-partitions: {} is {}", topicPartitions, endOffsets);
+ upcomingOffsetsWithLong.putAll(endOffsets);
+ return Optional.empty();
+ });
- oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)));
+ oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset)));
upcomingOffsetsWithLong.forEach((topicPartition, offset) -> {
- upcomingOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset));
+ upcomingOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset));
// Kafka's beginning Offset corresponds to the offset for the oldest message.
// Kafka's end offset corresponds to the offset for the upcoming message, and it is the newest offset + 1.
@@ -409,9 +409,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
LOG.warn(
"Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning",
topicPartition, offset);
- oldestOffsets.put(toSystemStreamPartition(topicPartition), "0");
+ oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), "0");
} else {
- newestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset - 1));
+ newestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset - 1));
}
});
return new OffsetsMaps(oldestOffsets, newestOffsets, upcomingOffsets);
@@ -430,23 +430,20 @@ public class KafkaSystemAdmin implements SystemAdmin {
LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", topics, systemName);
topics.forEach(topic -> {
- synchronized (metadataConsumer) {
- List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
-
- if (partitionInfos == null) {
- String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
- throw new SamzaException(msg);
- }
-
- List<TopicPartition> topicPartitions = partitionInfos.stream()
- .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
- .collect(Collectors.toList());
-
- OffsetsMaps offsetsForTopic = fetchTopicPartitionsMetadata(topicPartitions);
- allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
- allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
- allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
- }
+ OffsetsMaps offsetsForTopic = threadSafeKafkaConsumer.execute(consumer -> {
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ if (partitionInfos == null) {
+ String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
+ throw new SamzaException(msg);
+ }
+ List<TopicPartition> topicPartitions = partitionInfos.stream()
+ .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+ .collect(Collectors.toList());
+ return fetchTopicPartitionsMetadata(topicPartitions);
+ });
+ allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
+ allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
+ allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
});
return assembleMetadata(allOldestOffsets, allNewestOffsets, allUpcomingOffsets);
@@ -575,20 +572,6 @@ public class KafkaSystemAdmin implements SystemAdmin {
}
}
- // get partition info for topic
- Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
- Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>();
- List<PartitionInfo> partitionInfoList;
- for (String topic : topics) {
- synchronized (metadataConsumer) {
- partitionInfoList = metadataConsumer.partitionsFor(topic);
- }
- streamToPartitionsInfo.put(topic, partitionInfoList);
- }
-
- return streamToPartitionsInfo;
- }
-
/**
* Delete records up to (and including) the provided ssp offsets for
* all system stream partitions specified in the map.
@@ -639,11 +622,12 @@ public class KafkaSystemAdmin implements SystemAdmin {
@Override
public Set<SystemStream> getAllSystemStreams() {
- synchronized (metadataConsumer) {
- return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
- .map(x -> new SystemStream(systemName, x))
- .collect(Collectors.toSet());
- }
+ Map<String, List<PartitionInfo>> topicToPartitionInfoMap = threadSafeKafkaConsumer.execute(consumer -> consumer.listTopics());
+ Set<SystemStream> systemStreams = topicToPartitionInfoMap.keySet()
+ .stream()
+ .map(topic -> new SystemStream(systemName, topic))
+ .collect(Collectors.toSet());
+ return systemStreams;
}
/**
@@ -719,6 +703,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
return coordinatorStreamProperties;
}
+ @Override
+ public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+ return startpoint.apply(systemStreamPartition, kafkaStartpointToOffsetResolver);
+ }
+
/**
* Container for metadata about offsets.
*/
@@ -771,4 +760,124 @@ public class KafkaSystemAdmin implements SystemAdmin {
return kafkaProperties;
}
}
+
+ /**
+ * Offers a kafka specific implementation of {@link StartpointVisitor} that resolves
+ * different types of {@link Startpoint} to samza offset.
+ */
+ @VisibleForTesting
+ static class KafkaStartpointToOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
+
+ private final ThreadSafeKafkaConsumer threadSafeKafkaConsumer;
+
+ public KafkaStartpointToOffsetResolver(ThreadSafeKafkaConsumer threadSafeKafkaConsumer) {
+ this.threadSafeKafkaConsumer = threadSafeKafkaConsumer;
+ }
+
+ @VisibleForTesting
+ KafkaStartpointToOffsetResolver(Consumer consumer) {
+ this.threadSafeKafkaConsumer = new ThreadSafeKafkaConsumer(consumer);
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+ return startpointSpecific.getSpecificOffset();
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+ Preconditions.checkNotNull(startpointTimestamp, "Startpoint cannot be null");
+ Preconditions.checkNotNull(startpointTimestamp.getTimestampOffset(), "Timestamp field in startpoint cannot be null");
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+
+ Map<TopicPartition, Long> topicPartitionToTimestamp = ImmutableMap.of(topicPartition, startpointTimestamp.getTimestampOffset());
+ LOG.info("Finding offset for timestamp: {} in topic partition: {}.", startpointTimestamp.getTimestampOffset(), topicPartition);
+ Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = threadSafeKafkaConsumer.execute(consumer -> consumer.offsetsForTimes(topicPartitionToTimestamp));
+
+ OffsetAndTimestamp offsetAndTimestamp = topicPartitionToOffsetTimestamps.get(topicPartition);
+ if (offsetAndTimestamp != null) {
+ return String.valueOf(offsetAndTimestamp.offset());
+ } else {
+ LOG.info("Offset for timestamp: {} does not exist for partition: {}. Falling back to end offset.", startpointTimestamp.getTimestampOffset(), topicPartition);
+ return getEndOffset(systemStreamPartition);
+ }
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ Map<TopicPartition, Long> topicPartitionToOffsets = threadSafeKafkaConsumer.execute(consumer -> consumer.beginningOffsets(ImmutableSet.of(topicPartition)));
+ Long beginningOffset = topicPartitionToOffsets.get(topicPartition);
+ LOG.info("Beginning offset for topic partition: {} is {}.", topicPartition, beginningOffset);
+ return String.valueOf(beginningOffset);
+ }
+
+ @Override
+ public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+ return getEndOffset(systemStreamPartition);
+ }
+
+ /**
+ * Converts the {@link SystemStreamPartition} to {@link TopicPartition}.
+ * @param systemStreamPartition the input system stream partition.
+ * @return the converted topic partition.
+ */
+ static TopicPartition toTopicPartition(SystemStreamPartition systemStreamPartition) {
+ Preconditions.checkNotNull(systemStreamPartition);
+ Preconditions.checkNotNull(systemStreamPartition.getPartition());
+ Preconditions.checkNotNull(systemStreamPartition.getStream());
+
+ return new TopicPartition(systemStreamPartition.getStream(), systemStreamPartition.getPartition().getPartitionId());
+ }
+
+ /**
+ * Determines the end offset of the {@param SystemStreamPartition}.
+ * @param systemStreamPartition represents the system stream partition.
+ * @return the end offset of the partition.
+ */
+ private String getEndOffset(SystemStreamPartition systemStreamPartition) {
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ Map<TopicPartition, Long> topicPartitionToOffsets = threadSafeKafkaConsumer.execute(consumer -> consumer.endOffsets(ImmutableSet.of(topicPartition)));
+ Long endOffset = topicPartitionToOffsets.get(topicPartition);
+ LOG.info("End offset for topic partition: {} is {}.", topicPartition, endOffset);
+ return String.valueOf(endOffset);
+ }
+ }
+
+ /**
+ * Offers thread-safe operations over the vanilla {@link Consumer}.
+ */
+ static class ThreadSafeKafkaConsumer {
+
+ private final Consumer kafkaConsumer;
+
+ ThreadSafeKafkaConsumer(Consumer kafkaConsumer) {
+ this.kafkaConsumer = kafkaConsumer;
+ }
+
+ /**
+ * Executes the lambda function comprised of kafka-consumer operations in a thread-safe manner
+ * and returns the result of the execution.
+ *
+ * @param function accepts the kafka consumer as argument and returns a result after executing a
+ * sequence of operations on a kafka-broker.
+ * @param <T> the return type of the lambda function.
+ * @return the result of executing the lambda function.
+ */
+ public <T> T execute(Function<Consumer, T> function) {
+ // Kafka consumer is not thread-safe
+ synchronized (kafkaConsumer) {
+ return function.apply(kafkaConsumer);
+ }
+ }
+
+ /**
+ * Closes the underlying kafka consumer.
+ */
+ public void close() {
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.close();
+ }
+ }
+ }
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 0cb2bb6..015b76a 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -22,38 +22,26 @@
package org.apache.samza.system.kafka;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.startpoint.Startpoint;
-import org.apache.samza.startpoint.StartpointOldest;
-import org.apache.samza.startpoint.StartpointSpecific;
-import org.apache.samza.startpoint.StartpointTimestamp;
-import org.apache.samza.startpoint.StartpointUpcoming;
-import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;
+import org.apache.samza.util.KafkaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -73,17 +61,16 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
private final Config config;
private final boolean fetchThresholdBytesEnabled;
private final KafkaSystemConsumerMetrics metrics;
- private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler;
// This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
final KafkaConsumerMessageSink messageSink;
// This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
// BlockingEnvelopMap's buffers.
- private final KafkaConsumerProxy proxy;
+ private final KafkaConsumerProxy<K, V> proxy;
- // Holds mapping from {@code TopicPartition} to registered {@code Startpoint}. This will be used in the start().
- final Map<TopicPartition, Startpoint> topicPartitionToStartpointMap = new HashMap<>();
+ // Holds the mapping between the registered TopicPartition and offset until the consumer is started.
+ Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
long perPartitionFetchThreshold;
long perPartitionFetchThresholdBytes;
@@ -111,26 +98,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
// create a sink for passing the messages between the proxy and the consumer
- this.messageSink = new KafkaConsumerMessageSink();
+ messageSink = new KafkaConsumerMessageSink();
// Create the proxy to do the actual message reading.
proxy = kafkaConsumerProxyFactory.create(this.messageSink);
LOG.info("{}: Created proxy {} ", this, proxy);
- this.kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(kafkaConsumer, proxy);
- }
-
- @VisibleForTesting
- KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
- KafkaConsumerProxy<K, V> kafkaConsumerProxy, KafkaSystemConsumerMetrics metrics, Clock clock, KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler) {
- this.kafkaConsumer = kafkaConsumer;
- this.clientId = clientId;
- this.systemName = systemName;
- this.config = config;
- this.metrics = metrics;
- this.proxy = kafkaConsumerProxy;
- this.kafkaStartpointRegistrationHandler = kafkaStartpointRegistrationHandler;
- this.messageSink = new KafkaConsumerMessageSink();
- fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
}
/**
@@ -167,12 +139,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
private void startSubscription() {
//subscribe to all the registered TopicPartitions
- Set<TopicPartition> registeredTopicPartitions = new HashSet<>(topicPartitionToStartpointMap.keySet());
- LOG.info("{}: Consumer subscribes to {}", this, registeredTopicPartitions);
+ LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToOffset.keySet());
try {
synchronized (kafkaConsumer) {
// we are using assign (and not subscribe), so we need to specify both topic and partition
- kafkaConsumer.assign(registeredTopicPartitions);
+ kafkaConsumer.assign(topicPartitionsToOffset.keySet());
}
} catch (Exception e) {
throw new SamzaException("Consumer subscription failed for " + this, e);
@@ -186,14 +157,29 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
*/
void startConsumer() {
// set the offset for each TopicPartition
- if (topicPartitionToStartpointMap.size() <= 0) {
+ if (topicPartitionsToOffset.size() <= 0) {
LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
}
- topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
- Partition partition = new Partition(topicPartition.partition());
- SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition);
- startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler);
+ topicPartitionsToOffset.forEach((topicPartition, startingOffsetString) -> {
+ long startingOffset = Long.valueOf(startingOffsetString);
+
+ try {
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.seek(topicPartition, startingOffset);
+ }
+ } catch (Exception e) {
+ // all recoverable execptions are handled by the client.
+ // if we get here there is nothing left to do but bail out.
+ String msg = String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, topicPartition);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
+
+ LOG.info("{}: Changing consumer's starting offset for partition {} to {}", this, topicPartition, startingOffsetString);
+
+ // add the partition to the proxy
+ proxy.addTopicPartition(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), startingOffset);
});
// start the proxy thread
@@ -219,7 +205,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
}
- int numPartitions = topicPartitionToStartpointMap.size();
+ int numPartitions = topicPartitionsToOffset.size();
if (numPartitions > 0) {
perPartitionFetchThreshold = fetchThreshold / numPartitions;
@@ -265,13 +251,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
*/
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
- register(systemStreamPartition, new StartpointSpecific(offset));
- }
-
- @Override
- public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
if (started.get()) {
- String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint);
+ String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, offset: %s failed.", this, systemStreamPartition, offset);
throw new SamzaException(exceptionMessage);
}
@@ -279,16 +260,30 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
return;
}
+ LOG.info("{}: Registering ssp: {} with offset: {}", this, systemStreamPartition, offset);
- LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint);
-
- super.register(systemStreamPartition, startpoint);
+ super.register(systemStreamPartition, offset);
TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
- topicPartitionToStartpointMap.put(topicPartition, startpoint);
+
+ String existingOffset = topicPartitionsToOffset.get(topicPartition);
+ // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
+ if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+ topicPartitionsToOffset.put(topicPartition, offset);
+ }
+
metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
}
+ /**
+ * Compare two String offsets.
+ * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
+ * @return see {@link Long#compareTo(Long)}
+ */
+ private static int compareOffsets(String offset1, String offset2) {
+ return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+ }
+
@Override
public String toString() {
return String.format("%s:%s", systemName, clientId);
@@ -309,11 +304,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return super.poll(systemStreamPartitions, timeout);
}
- static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
- return new TopicAndPartition(tp.topic(), tp.partition());
+ protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
+ return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
}
- static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+ protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
}
@@ -325,100 +320,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return systemName;
}
- @VisibleForTesting
- static class KafkaStartpointRegistrationHandler implements StartpointVisitor {
-
- private final KafkaConsumerProxy proxy;
- private final Consumer kafkaConsumer;
-
- KafkaStartpointRegistrationHandler(Consumer consumer, KafkaConsumerProxy proxy) {
- this.proxy = proxy;
- this.kafkaConsumer = consumer;
- }
-
- @Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
- TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
- long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset());
- LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint);
-
- // KafkaConsumer is not thread-safe.
- synchronized (kafkaConsumer) {
- kafkaConsumer.seek(topicPartition, offsetInStartpoint);
-
- // add the partition to the proxy
- proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
- }
- }
-
- @Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
- Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
- TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
- Map<TopicPartition, Long> topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint);
-
- // Look up the offset by timestamp.
- LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint);
- Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = new HashMap<>();
- synchronized (kafkaConsumer) {
- topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
- }
-
- // If the timestamp does not exist for the partition, then seek the consumer to end.
- if (topicPartitionToOffsetTimestamps.get(topicPartition) == null) {
- LOG.info("Timestamp does not exist for partition: {}. Seeking the kafka consumer to the end offset.", topicPartition);
-
- // KafkaConsumer is not thread-safe.
- synchronized (kafkaConsumer) {
- kafkaConsumer.seekToEnd(ImmutableList.of(topicPartition));
-
- // add the partition to the proxy
- proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
- }
- } else {
-
- // KafkaConsumer is not thread-safe.
- synchronized (kafkaConsumer) {
- // Update the consumer fetch offsets.
- OffsetAndTimestamp offsetAndTimeStamp = topicPartitionToOffsetTimestamps.get(topicPartition);
- LOG.info("Updating the consumer fetch offsets of the topic partition: {} to {}.", topicPartition, offsetAndTimeStamp.offset());
- kafkaConsumer.seek(topicPartition, offsetAndTimeStamp.offset());
-
- // add the partition to the proxy
- proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
- }
- }
- }
-
- @Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
- TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
- Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
- LOG.info("Seeking the kafka consumer to the first offset for the topic partition: {}.", topicPartitions);
-
- // KafkaConsumer is not thread-safe.
- synchronized (kafkaConsumer) {
- kafkaConsumer.seekToBeginning(topicPartitions);
- // add the partition to the proxy
- proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
- }
- }
-
- @Override
- public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
- TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
- Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
- LOG.info("Seeking the kafka consumer to the end offset for the topic partition: {}.", topicPartitions);
-
- // KafkaConsumer is not thread-safe.
- synchronized (kafkaConsumer) {
- kafkaConsumer.seekToEnd(topicPartitions);
- // add the partition to the proxy
- proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
- }
- }
- }
-
public class KafkaConsumerMessageSink {
public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index cb91a76..ea6476e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -24,12 +24,14 @@ import java.util.concurrent.atomic.AtomicLong
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.config.{Config, ConfigException}
-import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.kafka.common.TopicPartition
+import org.apache.samza.config.Config
import org.apache.samza.execution.StreamManager
import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
import org.apache.kafka.common.errors.ReplicaNotAvailableException
import org.apache.kafka.common.protocol.Errors
+import org.apache.samza.Partition
object KafkaUtil extends Logging {
/**
@@ -52,6 +54,18 @@ object KafkaUtil extends Logging {
}
/**
+ * Converts the TopicPartition to SystemStreamPartition.
+ *
+ * @param topicPartition represents the TopicPartition.
+ * @return an instance of the SystemStreamPartition.
+ */
+ def toSystemStreamPartition(systemName: String, topicPartition: TopicPartition): SystemStreamPartition = {
+ val topic = topicPartition.topic()
+ val partition = new Partition(topicPartition.partition)
+ new SystemStreamPartition(systemName, topic, partition)
+ }
+
+ /**
* Exactly the same as Kafka's ErrorMapping.maybeThrowException
* implementation, except suppresses ReplicaNotAvailableException exceptions.
* According to the Kafka
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 9169b44..b76f4e7 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -25,16 +25,24 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
import org.apache.samza.Partition;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kafka.KafkaSystemAdmin.KafkaStartpointToOffsetResolver;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -42,10 +50,16 @@ import org.mockito.Mockito;
import static org.apache.samza.system.kafka.KafkaSystemAdmin.*;
import static org.junit.Assert.*;
-
public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
private static final String SYSTEM = "kafka";
private static final String TOPIC = "input";
+ private static final String TEST_SYSTEM = "test-system";
+ private static final String TEST_STREAM = "test-stream";
+ private static final Integer TEST_PARTITION_ID = 0;
+ private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
+ private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
+ private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
+ private static final String TEST_OFFSET = "10";
@Test
public void testGetOffsetsAfter() {
@@ -337,4 +351,90 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
assertEquals(expectedSystemStream2Partition0Metadata, stream2PartitionMetadata.get(new Partition(0)));
assertEquals(expectedSystemStream2Partition1Metadata, stream2PartitionMetadata.get(new Partition(1)));
}
+
+ @Test
+ public void testStartpointSpecificOffsetVisitorShouldResolveToCorrectOffset() {
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+ final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
+
+ // Invoke the consumer with startpoint.
+ String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+ Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+ }
+
+ @Test
+ public void testStartpointTimestampVisitorShouldResolveToCorrectOffset() {
+ // Define dummy variables for testing.
+ final Long testTimeStamp = 10L;
+
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+
+ final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+ final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
+ final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
+ TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+ // Mock the consumer interactions.
+ Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
+ Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+ String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+ Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+ }
+
+ @Test
+ public void testStartpointTimestampVisitorShouldResolveToCorrectOffsetWhenTimestampDoesNotExist() {
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+ final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
+ final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
+ offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+ // Mock the consumer interactions.
+ Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
+ Mockito.when(consumer.endOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
+
+ String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+ Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+
+ // Mock verifications.
+ Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
+ }
+
+ @Test
+ public void testStartpointOldestVisitorShouldResolveToCorrectOffset() {
+ // Define dummy variables for testing.
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+ final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+ // Mock the consumer interactions.
+ Mockito.when(consumer.beginningOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
+
+ // Invoke the consumer with startpoint.
+ String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+ Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+ }
+
+ @Test
+ public void testStartpointUpcomingVisitorShouldResolveToCorrectOffset() {
+ // Define dummy variables for testing.
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+
+ final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+ final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
+
+ // Mock the consumer interactions.
+ Mockito.when(consumer.endOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
+
+ // Invoke the consumer with startpoint.
+ String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+ Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+ }
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index 91444c7..5f74e38 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -21,14 +21,11 @@
package org.apache.samza.system.kafka;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.samza.Partition;
@@ -37,16 +34,11 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.startpoint.StartpointOldest;
-import org.apache.samza.startpoint.StartpointSpecific;
-import org.apache.samza.startpoint.StartpointTimestamp;
-import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.kafka.KafkaSystemConsumer.KafkaStartpointRegistrationHandler;
-import org.apache.samza.testUtils.TestClock;
import org.apache.samza.util.Clock;
import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.util.SystemClock;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -59,16 +51,10 @@ public class TestKafkaSystemConsumer {
private static final String TEST_STREAM = "test-stream";
private static final String TEST_JOB = "test-job";
private static final String TEST_CLIENT_ID = "testClientId";
- private static final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+ private static final String BOOTSTRAP_SERVERS = "127.0.0.1:8888";
private static final String FETCH_THRESHOLD_MSGS = "50000";
private static final String FETCH_THRESHOLD_BYTES = "100000";
- private static final Integer TEST_PARTITION_ID = 0;
- private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
- private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
- private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
- private static final String TEST_OFFSET = "10";
-
private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
final Map<String, String> map = new HashMap<>();
@@ -76,8 +62,7 @@ public class TestKafkaSystemConsumer {
map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg);
map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes);
- map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
- BOOTSTRAP_SERVER);
+ map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), BOOTSTRAP_SERVERS);
map.put(JobConfig.JOB_NAME(), "jobName");
Config config = new MapConfig(map);
@@ -121,7 +106,8 @@ public class TestKafkaSystemConsumer {
}
@Test
- public void testConsumerShouldRegisterTheLatestOffsetForSSP() {
+ public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
@@ -134,15 +120,14 @@ public class TestKafkaSystemConsumer {
consumer.register(ssp1, "3");
consumer.register(ssp2, "0");
- consumer.start();
-
- assertEquals("5", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset());
- assertEquals("3", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp1))).getSpecificOffset());
- assertEquals("0", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp2))).getSpecificOffset());
+ assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+ assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+ assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
}
@Test
public void testFetchThresholdBytes() {
+
SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
int partitionsNum = 2;
@@ -223,158 +208,32 @@ public class TestKafkaSystemConsumer {
}
@Test
- public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
- final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
- final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
- final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
- final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
-
- // Mock the consumer interactions.
- Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
- Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
-
- // Invoke the consumer with startpoint.
- kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
-
- // Mock verifications.
- Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
- Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
- }
-
- @Test
- public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
- // Define dummy variables for testing.
- final Long testTimeStamp = 10L;
-
- final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
- final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
- final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
- final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
- final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
- TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
-
- // Mock the consumer interactions.
- Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
- Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
- Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+ public void testStartConsumer() {
+ final Consumer consumer = Mockito.mock(Consumer.class);
+ final KafkaConsumerProxyFactory kafkaConsumerProxyFactory = Mockito.mock(KafkaConsumerProxyFactory.class);
- kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
-
- // Mock verifications.
- Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
- Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp));
- Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
- }
-
- @Test
- public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() {
- final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
- final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
- final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
- final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
- final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
- offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
-
- // Mock the consumer interactions.
- Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
- Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
-
- kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
-
- // Mock verifications.
- Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
- Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
- Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
- }
-
- @Test
- public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
- // Define dummy variables for testing.
- final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
- final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
- final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
- final StartpointOldest testStartpointSpecific = new StartpointOldest();
-
- // Mock the consumer interactions.
- Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
- Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
-
- // Invoke the consumer with startpoint.
- kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
-
- // Mock verifications.
- Mockito.verify(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
- Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
- }
-
- @Test
- public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
- // Define dummy variables for testing.
- final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics = new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry());
+ final SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+ final SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+ final String testOffset = "1";
final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
- final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
- final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
+ Mockito.when(kafkaConsumerProxyFactory.create(Mockito.anyObject())).thenReturn(kafkaConsumerProxy);
+ Mockito.doNothing().when(consumer).seek(new TopicPartition(TEST_STREAM, 0), 1);
+ Mockito.doNothing().when(consumer).seek(new TopicPartition(TEST_STREAM, 1), 1);
- // Mock the consumer interactions.
- Mockito.doNothing().when(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
- Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+ KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxyFactory,
+ kafkaSystemConsumerMetrics, new SystemClock());
+ kafkaSystemConsumer.register(testSystemStreamPartition1, testOffset);
+ kafkaSystemConsumer.register(testSystemStreamPartition2, testOffset);
- // Invoke the consumer with startpoint.
- kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
-
- // Mock verifications.
- Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
- Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
- }
-
- @Test
- public void testStartInvocationAfterStartPointsRegistrationShouldInvokeTheStartPointApplyMethod() {
- // Initialize the constants required for the test.
- final Consumer mockConsumer = Mockito.mock(Consumer.class);
- final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics = new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry());
+ kafkaSystemConsumer.startConsumer();
- // Test system stream partitions.
- SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
- SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
- SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2));
- SystemStreamPartition testSystemStreamPartition4 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(3));
-
- // Different kinds of {@code Startpoint}.
- StartpointSpecific startPointSpecific = new StartpointSpecific("100");
- StartpointTimestamp startpointTimestamp = new StartpointTimestamp(100L);
- StartpointOldest startpointOldest = new StartpointOldest();
- StartpointUpcoming startpointUpcoming = new StartpointUpcoming();
-
- // Mock the visit methods of KafkaStartpointRegistrationHandler.
- KafkaSystemConsumer.KafkaStartpointRegistrationHandler mockStartPointVisitor = Mockito.mock(KafkaStartpointRegistrationHandler.class);
- Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
- Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
- Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
- Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
-
- // Instantiate KafkaSystemConsumer for testing.
- KafkaConsumerProxy proxy = Mockito.mock(KafkaConsumerProxy.class);
- KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(mockConsumer, TEST_SYSTEM, new MapConfig(),
- TEST_CLIENT_ID, proxy, kafkaSystemConsumerMetrics, new TestClock(), mockStartPointVisitor);
-
-
- // Invoke the KafkaSystemConsumer register API with different type of startpoints.
- kafkaSystemConsumer.register(testSystemStreamPartition1, startPointSpecific);
- kafkaSystemConsumer.register(testSystemStreamPartition2, startpointTimestamp);
- kafkaSystemConsumer.register(testSystemStreamPartition3, startpointOldest);
- kafkaSystemConsumer.register(testSystemStreamPartition4, startpointUpcoming);
- kafkaSystemConsumer.start();
-
- // Mock verifications.
- Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
- Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
- Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
- Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
+ Mockito.verify(consumer).seek(new TopicPartition(TEST_STREAM, 0), 1);
+ Mockito.verify(consumer).seek(new TopicPartition(TEST_STREAM, 1), 1);
+ Mockito.verify(kafkaConsumerProxy).start();
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(testSystemStreamPartition1, Long.valueOf(testOffset));
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(testSystemStreamPartition2, Long.valueOf(testOffset));
}
// mock kafkaConsumer and SystemConsumer