You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/07 23:49:35 UTC
samza git commit: SAMZA-1415: Add clearStream API in SystemAdmin and
remove deprecated APIs
Repository: samza
Updated Branches:
refs/heads/0.14.0 23bfaa8d1 -> 79200c735
SAMZA-1415: Add clearStream API in SystemAdmin and remove deprecated APIs
The patch does the following:
1) add clearStream() APi in SystemAdmin. Currently it's only supported in Kafka with broker configuring delete.topic.enable=true.
2) remove the deprecated APIs including createChangeLogStream(), validateChangelogStream() and createCoordinatorStream().
Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Reviewers: Jake Maes <ja...@gmail.com>
Closes #292 from xinyuiscool/SAMZA-1415
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/79200c73
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/79200c73
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/79200c73
Branch: refs/heads/0.14.0
Commit: 79200c73509727abba2d6eb68cfc45ac1d842d35
Parents: 23bfaa8
Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Authored: Thu Sep 7 16:49:20 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Sep 7 16:49:20 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/system/StreamSpec.java | 22 +++
.../org/apache/samza/system/SystemAdmin.java | 42 ++----
...inglePartitionWithoutOffsetsSystemAdmin.java | 16 --
.../samza/coordinator/JobModelManager.scala | 8 +-
.../scala/org/apache/samza/job/JobRunner.scala | 9 +-
.../samza/storage/TaskStorageManager.scala | 12 +-
.../MockCoordinatorStreamSystemFactory.java | 13 +-
.../samza/execution/TestExecutionPlanner.java | 15 --
.../samza/checkpoint/TestOffsetManager.scala | 12 --
.../samza/container/TestTaskInstance.scala | 3 -
.../samza/coordinator/TestJobCoordinator.scala | 12 --
.../samza/storage/TestTaskStorageManager.scala | 6 +-
.../elasticsearch/ElasticsearchSystemAdmin.java | 15 --
.../samza/system/hdfs/HdfsSystemAdmin.java | 15 --
.../samza/system/kafka/KafkaSystemAdmin.scala | 93 +++++++-----
.../system/kafka/TestKafkaSystemAdminJava.java | 151 ++++++++++---------
.../system/kafka/TestKafkaSystemAdmin.scala | 5 +-
.../samza/system/mock/MockSystemAdmin.java | 15 --
.../samza/test/util/SimpleSystemAdmin.java | 15 --
.../apache/samza/job/yarn/MockSystemAdmin.scala | 12 --
20 files changed, 192 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 49531dd..384fecc 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -37,6 +37,12 @@ public class StreamSpec {
private static final int DEFAULT_PARTITION_COUNT = 1;
+ // Internal changelog stream id. It is used for creating changelog StreamSpec.
+ private static final String CHANGELOG_STREAM_ID = "samza-internal-changelog-stream-id";
+
+ // Internal coordinator stream id. It is used for creating coordinator StreamSpec.
+ private static final String COORDINATOR_STREAM_ID = "samza-internal-coordinator-stream-id";
+
/**
* Unique identifier for the stream in a Samza application.
* This identifier is used as a key for stream properties in the
@@ -200,6 +206,14 @@ public class StreamSpec {
return new SystemStream(systemName, physicalName);
}
+ public boolean isChangeLogStream() {
+ return id.equals(CHANGELOG_STREAM_ID);
+ }
+
+ public boolean isCoordinatorStream() {
+ return id.equals(COORDINATOR_STREAM_ID);
+ }
+
private void validateLogicalIdentifier(String identifierName, String identifierValue) {
if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) {
throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
@@ -220,4 +234,12 @@ public class StreamSpec {
public int hashCode() {
return id.hashCode();
}
+
+ public static StreamSpec createChangeLogStreamSpec(String physicalName, String systemName, int partitionCount) {
+ return new StreamSpec(CHANGELOG_STREAM_ID, physicalName, systemName, partitionCount);
+ }
+
+ public static StreamSpec createCoordinatorStreamSpec(String physicalName, String systemName) {
+ return new StreamSpec(COORDINATOR_STREAM_ID, physicalName, systemName, 1);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index b180712..e765540 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -50,38 +50,6 @@ public interface SystemAdmin {
Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
/**
- * An API to create a change log stream
- *
- * @param streamName
- * The name of the stream to be created in the underlying stream
- * @param numOfPartitions
- * The number of partitions in the changelog stream
- * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
- */
- void createChangelogStream(String streamName, int numOfPartitions);
-
- /**
- * Validates change log stream
- *
- * @param streamName
- * The name of the stream to be created in the underlying stream
- * @param numOfPartitions
- * The number of partitions in the changelog stream
- * @deprecated since 0.12.1, use {@link #validateStream(StreamSpec)}
- */
- void validateChangelogStream(String streamName, int numOfPartitions);
-
- /**
- * Create a stream for the job coordinator. If the stream already exists, this
- * call should simply return.
- *
- * @param streamName
- * The name of the coordinator stream to create.
- * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
- */
- void createCoordinatorStream(String streamName);
-
- /**
* Compare the two offsets. -1, 0, +1 means offset1 < offset2,
* offset1 == offset2 and offset1 > offset2 respectively. Return
* null if those two offsets are not comparable
@@ -114,4 +82,14 @@ public interface SystemAdmin {
default void validateStream(StreamSpec streamSpec) throws StreamValidationException {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Clear the stream described by the spec.
+ * @param streamSpec The spec for the physical stream on the system.
+ * @return {@code true} if the stream was successfully cleared.
+ * {@code false} if clearing stream failed.
+ */
+ default boolean clearStream(StreamSpec streamSpec) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 2157e69..49f7da0 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -56,16 +55,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
}
@Override
- public void createChangelogStream(String streamName, int numOfPartitions) {
- throw new SamzaException("Method not implemented");
- }
-
- @Override
- public void validateChangelogStream(String streamName, int numOfPartitions) {
- throw new SamzaException("Method not implemented");
- }
-
- @Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
@@ -77,11 +66,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
}
@Override
- public void createCoordinatorStream(String streamName) {
- throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
- }
-
- @Override
public Integer offsetComparator(String offset1, String offset2) {
return null;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 6319173..42bedec 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -304,7 +304,13 @@ object JobModelManager extends Logging {
.getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
).getAdmin(systemStream.getSystem, config)
- systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions)
+ val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions)
+ if (systemAdmin.createStream(changelogSpec)) {
+ info("Created changelog stream %s." format systemStream.getStream)
+ } else {
+ info("Changelog stream %s already exists." format systemStream.getStream)
+ }
+ systemAdmin.validateStream(changelogSpec)
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index f34db99..0e973e9 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -29,6 +29,7 @@ import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
import org.apache.samza.runtime.ApplicationRunnerOperation
+import org.apache.samza.system.StreamSpec
import org.apache.samza.util.{Logging, Util}
import scala.collection.JavaConverters._
@@ -85,7 +86,13 @@ class JobRunner(config: Config) extends Logging {
info("Creating coordinator stream")
val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
- systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream)
+ val streamName = coordinatorSystemStream.getStream
+ val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
+ if (systemAdmin.createStream(coordinatorSpec)) {
+ info("Created coordinator stream %s." format streamName)
+ } else {
+ info("Coordinator stream %s already exists." format streamName)
+ }
if (resetJobConfig) {
info("Storing config in coordinator stream.")
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 977ac5b..0879e9a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -25,14 +25,7 @@ import java.util
import org.apache.samza.config.StorageConfig
import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.container.TaskName
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamPartitionIterator
-import org.apache.samza.system.ExtendedSystemAdmin
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system._
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
import org.apache.samza.util.Clock
@@ -218,7 +211,8 @@ class TaskStorageManager(
val systemAdmin = systemAdmins
.getOrElse(systemStream.getSystem,
throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
- systemAdmin.validateChangelogStream(systemStream.getStream, changeLogStreamPartitions)
+ val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
+ systemAdmin.validateStream(changelogSpec)
}
val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 662c737..6413413 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -25,14 +25,7 @@ import org.apache.samza.config.ConfigException;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.*;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.apache.samza.util.Util;
@@ -208,8 +201,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
}
public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
- public void createCoordinatorStream(String streamName) {
+ @Override
+ public boolean createStream(StreamSpec streamSpec) {
// Do nothing.
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 2c8f682..c4fd8f7 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -93,21 +93,6 @@ public class TestExecutionPlanner {
}
@Override
- public void createChangelogStream(String streamName, int numOfPartitions) {
-
- }
-
- @Override
- public void validateChangelogStream(String streamName, int numOfPartitions) {
-
- }
-
- @Override
- public void createCoordinatorStream(String streamName) {
-
- }
-
- @Override
public Integer offsetComparator(String offset1, String offset2) {
return null;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index abfc63f..48504a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -340,18 +340,6 @@ class TestOffsetManager {
def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
Map[String, SystemStreamMetadata]().asJava
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
- override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
- override def createCoordinatorStream(streamName: String) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
override def offsetComparator(offset1: String, offset2: String) = null
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 9025077..dcb06d3 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -398,9 +398,6 @@ class TestTaskInstance {
class MockSystemAdmin extends SystemAdmin {
override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets }
override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null
- override def createCoordinatorStream(stream: String) = {}
- override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {}
- override def validateChangelogStream(topicName: String, numOfPartitions: Int) = {}
override def offsetComparator(offset1: String, offset2: String) = {
offset1.toLong compare offset2.toLong
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 0b6dd8b..e6b148b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -318,18 +318,6 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava
}
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
- override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
- override def createCoordinatorStream(streamName: String) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
override def offsetComparator(offset1: String, offset2: String) = null
override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 2495baf..ea4d37b 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -95,7 +95,8 @@ class TestTaskStorageManager extends MockitoSugar {
val mockStreamMetadataCache = mock[StreamMetadataCache]
val mockSystemConsumer = mock[SystemConsumer]
val mockSystemAdmin = mock[SystemAdmin]
- doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1)
+ val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", "kafka", 1)
+ doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
var registerOffset = "0"
when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
@@ -204,7 +205,8 @@ class TestTaskStorageManager extends MockitoSugar {
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
val mockStreamMetadataCache = mock[StreamMetadataCache]
val mockSystemAdmin = mock[SystemAdmin]
- doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1)
+ val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", "kafka", 1)
+ doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
val mockSystemConsumer = mock[SystemConsumer]
when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
index 446534a..3cadce0 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
@@ -54,21 +54,6 @@ public class ElasticsearchSystemAdmin implements SystemAdmin {
}
@Override
- public void createChangelogStream(String stream, int foo) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void createCoordinatorStream(String streamName) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void validateChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Integer offsetComparator(String offset1, String offset2) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index f5b05fb..9251db0 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -201,21 +201,6 @@ public class HdfsSystemAdmin implements SystemAdmin {
return systemStreamMetadataMap;
}
- @Override
- public void createChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
- }
-
- @Override
- public void validateChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
- }
-
- @Override
- public void createCoordinatorStream(String streamName) {
- throw new UnsupportedOperationException("HDFS doesn't support coordinator stream.");
- }
-
/**
* Compare two multi-file style offset. A multi-file style offset consist of both
* the file index as well as the offset within that file. And the format of it is:
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index af77d5b..6e582e9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -37,9 +37,8 @@ import scala.collection.JavaConverters._
object KafkaSystemAdmin extends Logging {
- // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
- // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
- val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+
+ val CLEAR_STREAM_RETRIES = 3
/**
* A helper method that takes oldest, newest, and upcoming offsets for each
@@ -328,23 +327,11 @@ class KafkaSystemAdmin(
offset
}
- override def createCoordinatorStream(streamName: String) {
- info("Attempting to create coordinator stream %s." format streamName)
-
- val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
-
- if (createStream(streamSpec)) {
- info("Created coordinator stream %s." format streamName)
- } else {
- info("Coordinator stream %s already exists." format streamName)
- }
- }
-
/**
* Helper method to use topic metadata cache when fetching metadata, so we
* don't hammer Kafka more than we need to.
*/
- protected def getTopicMetadata(topics: Set[String]) = {
+ def getTopicMetadata(topics: Set[String]) = {
new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
.getTopicInfo(topics)
}
@@ -415,7 +402,7 @@ class KafkaSystemAdmin(
* @inheritdoc
*/
override def createStream(spec: StreamSpec): Boolean = {
- val kSpec = KafkaStreamSpec.fromSpec(spec);
+ val kSpec = toKafkaSpec(spec)
var streamCreated = false
new ExponentialSleepStrategy(initialDelayMs = 500).run(
@@ -451,6 +438,23 @@ class KafkaSystemAdmin(
}
/**
+ * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream.
+ * @param spec a StreamSpec object
+ * @return KafkaStreamSpec object
+ */
+ def toKafkaSpec(spec: StreamSpec): KafkaStreamSpec = {
+ if (spec.isChangeLogStream) {
+ val topicName = spec.getPhysicalName
+ val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName))
+ new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, topicMeta.kafkaProps)
+ } else if (spec.isCoordinatorStream){
+ new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+ } else {
+ KafkaStreamSpec.fromSpec(spec)
+ }
+ }
+
+ /**
* @inheritdoc
*
* Validates a stream in Kafka. Should not be called before createStream(),
@@ -491,32 +495,41 @@ class KafkaSystemAdmin(
}
/**
- * Exception to be thrown when the change log stream creation or validation has failed
- */
- class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
- def this(s: String) = this(s, null)
- }
-
- override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
- val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
- val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
+ * @inheritdoc
+ *
+ * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
+ * Otherwise it's a no-op.
+ */
+ override def clearStream(spec: StreamSpec): Boolean = {
+ val kSpec = KafkaStreamSpec.fromSpec(spec)
+ var retries = CLEAR_STREAM_RETRIES
+ new ExponentialSleepStrategy().run(
+ loop => {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.deleteTopic(
+ zkClient,
+ kSpec.getPhysicalName)
+ } finally {
+ zkClient.close
+ }
- if (createStream(spec)) {
- info("Created changelog stream %s." format topicName)
- } else {
- info("Changelog stream %s already exists." format topicName)
- }
+ loop.done
+ },
- validateStream(spec)
- }
+ (exception, loop) => {
+ if (retries > 0) {
+ warn("Exception while trying to delete topic %s: %s. Retrying." format (spec.getPhysicalName, exception))
+ retries -= 1
+ } else {
+ warn("Fail to delete topic %s: %s" format (spec.getPhysicalName, exception))
+ loop.done
+ throw exception
+ }
+ })
- /**
- * Validates a stream in Kafka. Should not be called before createStream(),
- * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and
- * will auto-create a new topic.
- */
- override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
- validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions))
+ val topicMetadata = getTopicMetadata(Set(kSpec.getPhysicalName)).get(kSpec.getPhysicalName).get
+ topicMetadata.partitionsMetadata.isEmpty
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index ce59b40..51af518 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,15 +19,16 @@
package org.apache.samza.system.kafka;
+import java.util.*;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
+
+import kafka.api.TopicMetadata;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.Util;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.*;
@@ -39,53 +40,48 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@Test
- public void testCreateCoordinatorStreamDelegatesToCreateStream() {
+ public void testCreateCoordinatorStream() {
KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000);
SystemAdmin admin = Mockito.spy(systemAdmin);
- StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", "testSystem");
+ StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
- admin.createCoordinatorStream(spec.getPhysicalName());
+ admin.createStream(spec);
admin.validateStream(spec);
Mockito.verify(admin).createStream(Mockito.any());
}
@Test
- public void testCreateChangelogStreamDelegatesToCreateStream() {
- final String STREAM = "testChangeLogStream";
- final int PARTITIONS = 12;
- final int REP_FACTOR = 3;
+ public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
+ final String STREAM = "test.coordinator_test.Stream";
Properties coordProps = new Properties();
- Properties changeLogProps = new Properties();
- changeLogProps.setProperty("cleanup.policy", "compact");
- changeLogProps.setProperty("segment.bytes", "139");
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
- changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
- SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
- StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
- admin.createChangelogStream(STREAM, PARTITIONS);
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+ StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
+
+ Mockito.doAnswer(invocationOnMock -> {
+ StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
+ assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
+ assertTrue(internalSpec.isCoordinatorStream());
+ assertEquals(SYSTEM(), internalSpec.getSystemName());
+ assertEquals(STREAM, internalSpec.getPhysicalName());
+ assertEquals(1, internalSpec.getPartitionCount());
+
+ return internalSpec;
+ }).when(admin).toKafkaSpec(Mockito.any());
+
+ admin.createStream(spec);
admin.validateStream(spec);
- ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
- Mockito.verify(admin).createStream(specCaptor.capture());
-
- StreamSpec internalSpec = specCaptor.getValue();
- assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
- assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
- assertEquals(SYSTEM(), internalSpec.getSystemName());
- assertEquals(STREAM, internalSpec.getPhysicalName());
- assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
- assertEquals(PARTITIONS, internalSpec.getPartitionCount());
- assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
}
@Test
- public void testCreateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
- final String STREAM = "test.Change_Log.Stream";
+ public void testCreateChangelogStream() {
+ final String STREAM = "testChangeLogStream";
final int PARTITIONS = 12;
- final int REP_FACTOR = 3;
+ final int REP_FACTOR = 1;
Properties coordProps = new Properties();
Properties changeLogProps = new Properties();
@@ -94,60 +90,56 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
- SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
- StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
- admin.createChangelogStream(STREAM, PARTITIONS);
- admin.validateStream(spec);
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+ StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
- ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
- Mockito.verify(admin).createStream(specCaptor.capture());
-
- StreamSpec internalSpec = specCaptor.getValue();
- assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
- assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
- assertEquals(SYSTEM(), internalSpec.getSystemName());
- assertEquals(STREAM, internalSpec.getPhysicalName());
- assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
- assertEquals(PARTITIONS, internalSpec.getPartitionCount());
- assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
- }
+ Mockito.doAnswer(invocationOnMock -> {
+ StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
+ assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
+ assertTrue(internalSpec.isChangeLogStream());
+ assertEquals(SYSTEM(), internalSpec.getSystemName());
+ assertEquals(STREAM, internalSpec.getPhysicalName());
+ assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+ assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+ assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
- @Test
- public void testValidateChangelogStreamDelegatesToValidateStream() {
- final String STREAM = "testChangeLogValidate";
- Properties coordProps = new Properties();
- Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
- changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
-
- KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
- SystemAdmin admin = Mockito.spy(systemAdmin);
- StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+ return internalSpec;
+ }).when(admin).toKafkaSpec(Mockito.any());
- admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+ admin.createStream(spec);
admin.validateStream(spec);
- admin.validateChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
-
- Mockito.verify(admin).createStream(Mockito.any());
- Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
}
@Test
- public void testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
- final String STREAM = "test.Change_Log.Validate";
+ public void testCreateChangelogStreamWithSpecialCharsInTopicName() {
+ final String STREAM = "test.Change_Log.Stream";
+ final int PARTITIONS = 12;
+ final int REP_FACTOR = 1;
+
Properties coordProps = new Properties();
+ Properties changeLogProps = new Properties();
+ changeLogProps.setProperty("cleanup.policy", "compact");
+ changeLogProps.setProperty("segment.bytes", "139");
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
- changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
-
- KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
- SystemAdmin admin = Mockito.spy(systemAdmin);
- StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+ changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
- admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+ KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+ StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
+ Mockito.doAnswer(invocationOnMock -> {
+ StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
+ assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
+ assertTrue(internalSpec.isChangeLogStream());
+ assertEquals(SYSTEM(), internalSpec.getSystemName());
+ assertEquals(STREAM, internalSpec.getPhysicalName());
+ assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+ assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+ assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
+
+ return internalSpec;
+ }).when(admin).toKafkaSpec(Mockito.any());
+
+ admin.createStream(spec);
admin.validateStream(spec);
- admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should not throw
-
- Mockito.verify(admin).createStream(Mockito.any());
- Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
}
@Test
@@ -191,4 +183,17 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
admin.validateStream(spec2);
}
+
+ @Test
+ public void testClearStream() {
+ KafkaSystemAdmin admin = this.basicSystemAdmin;
+ StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
+
+ assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
+ assertTrue(admin.clearStream(spec));
+
+ scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
+ scala.collection.immutable.Map<String, TopicMetadata> metadata = admin.getTopicMetadata(topic);
+ assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 19f3903..6fb03a1 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.samza.Partition
import org.apache.samza.config.KafkaProducerConfig
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore}
import org.junit.Assert._
import org.junit._
@@ -283,7 +283,8 @@ class TestKafkaSystemAdmin {
val topic = "test-coordinator-stream"
val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
- systemAdmin.createCoordinatorStream(topic)
+ val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
+ systemAdmin.createStream(spec)
validateTopic(topic, 1)
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
assertTrue(topicMetadataMap.contains(topic))
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index a05f89a..322b367 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -66,21 +66,6 @@ public class MockSystemAdmin implements SystemAdmin {
}
@Override
- public void createChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException("Method not implemented");
- }
-
- @Override
- public void validateChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException("Method not implemented");
- }
-
- @Override
- public void createCoordinatorStream(String streamName) {
- throw new UnsupportedOperationException("Method not implemented.");
- }
-
- @Override
public Integer offsetComparator(String offset1, String offset2) {
return null;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
index 41f01c5..8890a2f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
@@ -63,21 +63,6 @@ public class SimpleSystemAdmin implements SystemAdmin {
}
@Override
- public void createChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void validateChangelogStream(String streamName, int numOfPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void createCoordinatorStream(String streamName) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public Integer offsetComparator(String offset1, String offset2) {
if (offset1 == null) {
return offset2 == null ? 0 : -1;
http://git-wip-us.apache.org/repos/asf/samza/blob/79200c73/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
index c320a97..5650d4b 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
@@ -38,17 +38,5 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
}).toMap.asJava
}
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
- override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
- override def createCoordinatorStream(streamName: String) {
- new UnsupportedOperationException("Method not implemented.")
- }
-
override def offsetComparator(offset1: String, offset2: String) = null
}