You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/18 22:31:10 UTC
samza git commit: SAMZA-1478: Delete unneeded data from intermediate
Kafka topic on offset commit
Repository: samza
Updated Branches:
refs/heads/master 5d73ecdad -> 3cc2a05f7
SAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit
Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <do...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>, Jacob Maes <jm...@apache.org>, Yi Pan <ni...@gmail.com>
Closes #347 from lindong28/SAMZA-1478
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3cc2a05f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3cc2a05f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3cc2a05f
Branch: refs/heads/master
Commit: 3cc2a05f7009c34759baf1533f0a0e58f75839de
Parents: 5d73ecd
Author: Dong Lin <li...@gmail.com>
Authored: Wed Apr 18 15:31:06 2018 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Apr 18 15:31:06 2018 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 20 ++++
.../org/apache/samza/system/SystemAdmin.java | 12 +-
.../org/apache/samza/execution/StreamEdge.java | 1 +
.../apache/samza/execution/StreamManager.java | 2 +-
.../runtime/AbstractApplicationRunner.java | 4 +-
.../org/apache/samza/task/AsyncRunLoop.java | 2 +-
.../org/apache/samza/config/StreamConfig.scala | 13 ++-
.../org/apache/samza/config/SystemConfig.scala | 7 ++
.../apache/samza/container/SamzaContainer.scala | 5 +-
.../apache/samza/container/TaskInstance.scala | 14 ++-
.../apache/samza/execution/TestStreamEdge.java | 4 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 52 ++++++++-
.../samza/system/kafka/KafkaSystemFactory.scala | 10 +-
.../system/kafka/TestKafkaSystemAdminJava.java | 36 +++---
.../system/kafka/TestKafkaSystemAdmin.scala | 32 +++--
.../samza/test/operator/BroadcastAssertApp.java | 9 +-
.../test/operator/RepartitionJoinWindowApp.java | 26 +++--
...StreamApplicationIntegrationTestHarness.java | 18 ++-
.../operator/TestRepartitionJoinWindowApp.java | 117 +++++++++++++++----
.../AbstractIntegrationTestHarness.scala | 8 +-
20 files changed, 290 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4495cb1..86ac427 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -951,6 +951,16 @@
</tr>
<tr>
+ <td class="property" id="systems-samza-delete-committed-messages">systems.<span class="system">system-name</span>.<br>samza.delete.committed.messages</td>
+ <td class="default">false</td>
+ <td class="description">
+ If set to true, automatically delete committed messages from streams whose committed messages can be deleted.
+ A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
+ set <a href="#streams-samza-delete-committed-messages">streams.<span class="stream">stream-id</span>.<br>samza.delete.committed.messages</a> to true in the configuration.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="systems-samza-key-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
<td class="default" rowspan="2"></td>
<td class="description">
@@ -1105,6 +1115,16 @@
</tr>
<tr>
+ <td class="property" id="streams-samza-delete-committed-messages">streams.<span class="stream">stream-id</span>.<br>samza.delete.committed.messages</td>
+ <td class="default">false</td>
+ <td class="description">
+ If set to true, committed messages of this stream can be deleted. Committed messages of this stream will be deleted
+ if <a href="#systems-samza-delete-committed-messages">systems.<span class="system">system-name</span>.samza.delete.committed.messages</a> is also
+ set to true.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
<td class="default">false</td>
<td class="description">
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index dce7030..13566a6 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -95,7 +95,7 @@ public interface SystemAdmin {
}
/**
- * Clear the stream described by the spec.
+ * Clear the entire stream described by the spec.
* @param streamSpec The spec for the physical stream on the system.
* @return {@code true} if the stream was successfully cleared.
* {@code false} if clearing stream failed.
@@ -103,4 +103,14 @@ public interface SystemAdmin {
default boolean clearStream(StreamSpec streamSpec) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
+ *
+ * @param offsets a map from system stream partition to offset
+ */
+ default void deleteMessages(Map<SystemStreamPartition, String> offsets) {
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index bc08e70..b4c93d9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -124,6 +124,7 @@ public class StreamEdge {
config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
if (isIntermediate()) {
config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
+ config.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index 3c1818f..b0473c1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -102,7 +102,7 @@ public class StreamManager {
//Find all intermediate streams and clean up
Set<StreamSpec> intStreams = JavaConversions.asJavaCollection(streamConfig.getStreamIds()).stream()
- .filter(streamConfig::getIsIntermediate)
+ .filter(streamConfig::getIsIntermediateStream)
.map(id -> new StreamSpec(id, streamConfig.getPhysicalName(id), streamConfig.getSystem(id)))
.collect(Collectors.toSet());
intStreams.forEach(stream -> {
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 609b0ec..68962ce 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -119,8 +119,7 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
return new StreamSpec(streamId, physicalName, system, isBounded, properties);
}
- /* package private */
- ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
+ public ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
return getExecutionPlan(app, null);
}
@@ -171,4 +170,5 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
log.warn("Failed to write execution plan json to file", e);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index f3c4655..3b3e008 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -347,7 +347,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
this.task = task;
this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock);
Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
- this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, task.hasIntermediateStreams());
+ this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, task.intermediateStreams().nonEmpty());
}
private void init() {
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index db86969..0a4623e 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -37,6 +37,7 @@ object StreamConfig {
val BOOTSTRAP = SAMZA_PROPERTY + "bootstrap"
val PRIORITY = SAMZA_PROPERTY + "priority"
val IS_INTERMEDIATE = SAMZA_PROPERTY + "intermediate"
+ val DELETE_COMMITTED_MESSAGES = SAMZA_PROPERTY + "delete.committed.messages"
val IS_BOUNDED = SAMZA_PROPERTY + "bounded"
// We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
@@ -47,6 +48,7 @@ object StreamConfig {
val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
val IS_INTERMEDIATE_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE
+ val DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID = STREAM_ID_PREFIX + DELETE_COMMITTED_MESSAGES
val IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED
val PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY
val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT
@@ -166,10 +168,19 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
* @param streamId the identifier for the stream in the config.
* @return true if the stream is intermediate
*/
- def getIsIntermediate(streamId: String) = {
+ def getIsIntermediateStream(streamId: String) = {
getBoolean(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID format streamId, false)
}
+ /**
+ * Gets the boolean flag of whether the committed messages specified streamId can be deleted
+ * @param streamId the identifier for the stream in the config.
+ * @return true if the committed messages of the stream can be deleted
+ */
+ def getDeleteCommittedMessages(streamId: String) = {
+ getBoolean(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID format streamId, false)
+ }
+
def getIsBounded(streamId: String) = {
getBoolean(StreamConfig.IS_BOUNDED_FOR_STREAM_ID format streamId, false)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 91fb261..bebdbd8 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -31,6 +31,11 @@ object SystemConfig {
val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT
val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
+ // If true, automatically delete committed messages from streams whose committed messages can be deleted.
+ // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
+ // set streams.{streamId}.samza.delete.committed.messages to true in the configuration.
+ val DELETE_COMMITTED_MESSAGES = SYSTEM_PREFIX + "samza.delete.committed.messages"
+
implicit def Config2System(config: Config) = new SystemConfig(config)
}
@@ -45,6 +50,8 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
+ def deleteCommittedMessages(systemName: String) = getOption(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName))
+
/**
* Returns a list of all system names from the config file. Useful for
* getting individual systems.
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 1641511..7aec8e1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -288,7 +288,7 @@ object SamzaContainer extends Logging {
val intermediateStreams = config
.getStreamIds
- .filter(config.getIsIntermediate(_))
+ .filter(config.getIsIntermediateStream(_))
.toList
info("Got intermediate streams: %s" format intermediateStreams)
@@ -623,8 +623,6 @@ object SamzaContainer extends Logging {
} else {
info(s"Disk quotas disabled because polling interval is not set ($DISK_POLL_INTERVAL_KEY)")
}
-
-
info("Samza container setup complete.")
new SamzaContainer(
@@ -1000,7 +998,6 @@ class SamzaContainer(
def shutdownTask {
info("Shutting down task instance stream tasks.")
-
if (taskThreadPool != null) {
info("Shutting down task thread pool")
try {
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 3ac37c6..61e8c77 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -37,6 +37,7 @@ import org.apache.samza.util.Logging
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
+import scala.collection.Map
class TaskInstance(
val task: Any,
@@ -72,7 +73,9 @@ class TaskInstance(
scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
- val hasIntermediateStreams = config.getStreamIds.exists(config.getIsIntermediate(_))
+ val intermediateStreams: Set[String] = config.getStreamIds.filter(config.getIsIntermediateStream).toSet
+
+ val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
def registerMetrics {
debug("Registering metrics for taskName: %s" format taskName)
@@ -218,6 +221,15 @@ class TaskInstance(
trace("Checkpointing offsets for taskName: %s" format taskName)
offsetManager.writeCheckpoint(taskName, checkpoint)
+
+ if (checkpoint != null) {
+ checkpoint.getOffsets.asScala
+ .filter { case (ssp, _) => streamsToDeleteCommittedMessages.contains(ssp.getStream) } // Only delete data of intermediate streams
+ .groupBy { case (ssp, _) => ssp.getSystem }
+ .foreach { case (systemName: String, offsets: Map[SystemStreamPartition, String]) =>
+ systemAdmins.getSystemAdmin(systemName).deleteMessages(offsets.asJava)
+ }
+ }
}
def shutdownTask {
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 424f102..8ad5b7e 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -63,7 +63,7 @@ public class TestStreamEdge {
StreamConfig streamConfig = new StreamConfig(config);
assertEquals(streamConfig.getSystem(spec.getId()), "system-1");
assertEquals(streamConfig.getPhysicalName(spec.getId()), "physical-stream-1");
- assertEquals(streamConfig.getIsIntermediate(spec.getId()), false);
+ assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), false);
assertEquals(streamConfig.getIsBounded(spec.getId()), false);
assertEquals(streamConfig.getStreamProperties(spec.getId()).get("property1"), "haha");
@@ -78,7 +78,7 @@ public class TestStreamEdge {
edge = new StreamEdge(spec, true, new MapConfig());
config = edge.generateConfig();
streamConfig = new StreamConfig(config);
- assertEquals(streamConfig.getIsIntermediate(spec.getId()), true);
+ assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), true);
assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), "oldest");
assertEquals(streamConfig.getPriority(spec.toSystemStream()), Integer.MAX_VALUE);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 1f4672d..c76f6e5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -22,12 +22,15 @@ package org.apache.samza.system.kafka
import java.util
import java.util.{Properties, UUID}
-import kafka.admin.AdminUtils
+import com.google.common.annotations.VisibleForTesting
+import kafka.admin.{AdminClient, AdminUtils}
import kafka.api._
import kafka.common.TopicAndPartition
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import kafka.utils.ZkUtils
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.TopicPartition
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
@@ -38,6 +41,7 @@ import scala.collection.JavaConverters._
object KafkaSystemAdmin extends Logging {
+ @VisibleForTesting @volatile var deleteMessagesCalled = false
val CLEAR_STREAM_RETRIES = 3
/**
@@ -144,20 +148,38 @@ class KafkaSystemAdmin(
/**
* Kafka properties to be used during the intermediate topic creation
*/
- intermediateStreamProperties: Map[String, Properties] = Map()) extends ExtendedSystemAdmin with Logging {
+ intermediateStreamProperties: Map[String, Properties] = Map(),
+
+ /**
+ * Whether deleteMessages() API can be used
+ */
+ deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging {
import KafkaSystemAdmin._
@volatile var running = false
+ @volatile var adminClient: AdminClient = null
override def start() = {
- running = true
+ if (!running) {
+ running = true
+ adminClient = createAdminClient()
+ }
}
override def stop() = {
- running = false
+ if (running) {
+ running = false
+ adminClient.close()
+ adminClient = null
+ }
}
+ private def createAdminClient(): AdminClient = {
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListString)
+ AdminClient.create(props)
+ }
override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
@@ -555,13 +577,33 @@ class KafkaSystemAdmin(
}
/**
+ * @inheritdoc
+ *
+ * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
+ * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
+ */
+ override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
+ deleteMessagesCalled = true
+
+ if (!running) {
+ throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
+ }
+ if (deleteCommittedMessages) {
+ val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) =>
+ (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
+ }.toMap
+ adminClient.deleteRecordsBefore(nextOffsets)
+ }
+ }
+
+ /**
* Compare the two offsets. Returns x where x < 0 if offset1 < offset2;
* x == 0 if offset1 == offset2; x > 0 if offset1 > offset2.
*
* Currently it's used in the context of the broadcast streams to detect
* the mismatch between two streams when consuming the broadcast streams.
*/
- override def offsetComparator(offset1: String, offset2: String) = {
+ override def offsetComparator(offset1: String, offset2: String): Integer = {
offset1.toLong compare offset2.toLong
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a480042..9f0b5f2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -33,6 +33,7 @@ import org.apache.samza.system.SystemFactory
import org.apache.samza.config.StorageConfig._
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemAdmin
+import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.system.SystemConsumer
object KafkaSystemFactory extends Logging {
@@ -124,9 +125,9 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
(topicName, changelogInfo)
- }}.toMap
-
+ }}
+ val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean)
val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
new KafkaSystemAdmin(
systemName,
@@ -138,7 +139,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
bufferSize,
clientId,
topicMetaInformation,
- intermediateStreamProperties)
+ intermediateStreamProperties,
+ deleteCommittedMessages)
}
def getCoordinatorTopicProperties(config: Config) = {
@@ -152,7 +154,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val appConfig = new ApplicationConfig(config)
if (appConfig.getAppMode == ApplicationMode.BATCH) {
val streamConfig = new StreamConfig(config)
- streamConfig.getStreamIds().filter(streamConfig.getIsIntermediate(_)).map(streamId => {
+ streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
val properties = new Properties()
properties.putAll(streamConfig.getStreamProperties(streamId))
properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 125cf61..77f47f9 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -28,6 +28,8 @@ import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -36,18 +38,13 @@ import static org.junit.Assert.*;
public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
- KafkaSystemAdmin basicSystemAdmin = createSystemAdmin();
-
-
@Test
public void testCreateCoordinatorStream() {
- KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000);
- SystemAdmin admin = Mockito.spy(systemAdmin);
+ SystemAdmin admin = Mockito.spy(systemAdmin());
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
admin.createStream(spec);
admin.validateStream(spec);
-
Mockito.verify(admin).createStream(Mockito.any());
}
@@ -144,56 +141,51 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@Test
public void testCreateStream() {
- SystemAdmin admin = this.basicSystemAdmin;
StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
- assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
- admin.validateStream(spec);
+ assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+ systemAdmin().validateStream(spec);
- assertFalse("createStream should return false if the stream already exists.", admin.createStream(spec));
+ assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamDoesNotExist() {
- SystemAdmin admin = this.basicSystemAdmin;
StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8);
- admin.validateStream(spec);
+ systemAdmin().validateStream(spec);
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamWrongPartitionCount() {
- SystemAdmin admin = this.basicSystemAdmin;
StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
- assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
+ assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
- admin.validateStream(spec2);
+ systemAdmin().validateStream(spec2);
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamWrongName() {
- SystemAdmin admin = this.basicSystemAdmin;
StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
- assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
+ assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
- admin.validateStream(spec2);
+ systemAdmin().validateStream(spec2);
}
@Test
public void testClearStream() {
- KafkaSystemAdmin admin = this.basicSystemAdmin;
StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
- assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
- assertTrue(admin.clearStream(spec));
+ assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+ assertTrue(systemAdmin().clearStream(spec));
scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
- scala.collection.immutable.Map<String, TopicMetadata> metadata = admin.getTopicMetadata(topic);
+ scala.collection.immutable.Map<String, TopicMetadata> metadata = systemAdmin().getTopicMetadata(topic);
assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 2039447..a533acc 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -26,7 +26,7 @@ import java.util.{Properties, UUID}
import kafka.admin.AdminUtils
import org.apache.kafka.common.errors.LeaderNotAvailableException
import org.apache.kafka.common.protocol.Errors
-import kafka.consumer.{Consumer, ConsumerConfig}
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestUtils, ZkUtils}
@@ -59,16 +59,16 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
var metadataStore: TopicMetadataStore = null
var producerConfig: KafkaProducerConfig = null
- var brokers: String = null
+ var systemAdmin: KafkaSystemAdmin = null
- def generateConfigs() = {
+ override def generateConfigs(): Seq[KafkaConfig] = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
props.map(KafkaConfig.fromProps)
}
@BeforeClass
- override def setUp {
- super.setUp
+ override def setUp() {
+ super.setUp()
val config = new java.util.HashMap[String, String]()
config.put("bootstrap.servers", brokerList)
config.put("acks", "all")
@@ -76,15 +76,17 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
producerConfig = new KafkaProducerConfig("kafka", "i001", config)
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
+ systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+ systemAdmin.start()
}
-
@AfterClass
- override def tearDown {
- super.tearDown
+ override def tearDown() {
+ systemAdmin.stop()
+ producer.close()
+ super.tearDown()
}
-
def createTopic(topicName: String, partitionCount: Int) {
AdminUtils.createTopic(
zkUtils,
@@ -119,7 +121,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
}
}
- def getConsumerConnector = {
+ def getConsumerConnector(): ConsumerConnector = {
val props = new Properties
props.put("zookeeper.connect", zkConnect)
@@ -130,12 +132,9 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
Consumer.create(consumerConfig)
}
- def createSystemAdmin: KafkaSystemAdmin = {
- new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
- }
-
def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
- new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
+ new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
+ coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
}
}
@@ -147,9 +146,6 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
class TestKafkaSystemAdmin {
import TestKafkaSystemAdmin._
- // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
- val systemAdmin = createSystemAdmin
-
@Test
def testShouldAssembleMetadata {
val oldestOffsets = Map(
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
index 426a53b..88e2765 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
@@ -29,15 +29,18 @@ import org.apache.samza.test.framework.StreamAssert;
import java.util.Arrays;
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
-
public class BroadcastAssertApp implements StreamApplication {
+ public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
+
+
@Override
public void init(StreamGraph graph, Config config) {
+ String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
+
final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
final MessageStream<PageView> broadcastPageViews = graph
- .getInputStream(PAGE_VIEWS, serde)
+ .getInputStream(inputTopic, serde)
.broadcast(serde, "pv");
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 346e958..120f902 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -23,13 +23,15 @@ import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.operator.data.AdClick;
import org.apache.samza.test.operator.data.PageView;
import org.apache.samza.test.operator.data.UserPageAdClick;
@@ -40,16 +42,19 @@ import java.time.Duration;
* A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
*/
public class RepartitionJoinWindowApp implements StreamApplication {
- static final String PAGE_VIEWS = "page-views";
- static final String AD_CLICKS = "ad-clicks";
- static final String OUTPUT_TOPIC = "user-ad-click-counts";
+
+ public static final String INPUT_TOPIC_NAME_1_PROP = "inputTopicName1";
+ public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
+ public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, new JsonSerdeV2<>(PageView.class));
- MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICKS, new JsonSerdeV2<>(AdClick.class));
- OutputStream<KV<String, String>> outputStream =
- graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
+ String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP);
+ String inputTopicName2 = config.get(INPUT_TOPIC_NAME_2_PROP);
+ String outputTopic = config.get(OUTPUT_TOPIC_NAME_PROP);
+
+ MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, new JsonSerdeV2<>(PageView.class));
+ MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, new JsonSerdeV2<>(AdClick.class));
MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
.partitionBy(PageView::getViewId, pv -> pv,
@@ -73,7 +78,10 @@ public class RepartitionJoinWindowApp implements StreamApplication {
.window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3),
new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
- .sendTo(outputStream);
+ .sink((message, messageCollector, taskCoordinator) -> {
+ taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
+ messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
+ });
}
private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index fae3db4..1595347 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -27,10 +27,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.execution.TestStreamManager;
+import org.apache.samza.runtime.AbstractApplicationRunner;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.kafka.KafkaSystemAdmin;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.apache.samza.test.framework.StreamAssert;
import scala.Option;
@@ -98,8 +100,9 @@ import java.util.Properties;
public class StreamApplicationIntegrationTestHarness extends AbstractIntegrationTestHarness {
private KafkaProducer producer;
private KafkaConsumer consumer;
+ protected KafkaSystemAdmin systemAdmin;
private StreamApplication app;
- private ApplicationRunner runner;
+ protected AbstractApplicationRunner runner;
private int numEmptyPolls = 3;
private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
@@ -144,6 +147,9 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
Option$.MODULE$.<File>empty(),
Option$.MODULE$.<Properties>empty(),
Option$.MODULE$.<Properties>apply(consumerDeserializerProperties));
+
+ systemAdmin = createSystemAdmin("kafka");
+ systemAdmin.start();
}
/**
@@ -213,7 +219,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
* @param appName the name of the application
* @param overriddenConfigs configs to override
*/
- public void runApplication(StreamApplication streamApplication, String appName, Config overriddenConfigs) {
+ public void runApplication(StreamApplication streamApplication, String appName, Map<String, String> overriddenConfigs) {
Map<String, String> configs = new HashMap<>();
configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory");
configs.put("job.name", appName);
@@ -230,6 +236,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
configs.put("job.default.system", "kafka");
configs.put("job.coordinator.replication.factor", "1");
configs.put("task.window.ms", "1000");
+ configs.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName());
// This is to prevent tests from taking a long time to stop after they're done. The issue is that
// tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately.
@@ -247,7 +254,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
}
app = streamApplication;
- runner = ApplicationRunner.fromConfig(new MapConfig(configs));
+ runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(new MapConfig(configs));
runner.run(streamApplication);
StreamAssert.waitForComplete();
@@ -262,6 +269,9 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
*/
@Override
public void tearDown() {
+ systemAdmin.stop();
+ producer.close();
+ consumer.close();
super.tearDown();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index a83f9cf..77cd19a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -18,56 +18,99 @@
*/
package org.apache.samza.test.operator;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.Partition;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.kafka.KafkaSystemAdmin;
+import org.apache.samza.util.ExponentialSleepStrategy;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.AD_CLICKS;
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.OUTPUT_TOPIC;
-import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
-
/**
* Test driver for {@link RepartitionJoinWindowApp}.
*/
public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTestHarness {
- @Before
- public void setup() {
+ void initializeTopics(String input1, String input2, String output) {
// create topics
- createTopic(PAGE_VIEWS, 2);
- createTopic(AD_CLICKS, 2);
- createTopic(OUTPUT_TOPIC, 1);
+ createTopic(input1, 2);
+ createTopic(input2, 2);
+ createTopic(output, 1);
// create events for the following user activity.
// userId: (viewId, pageId, (adIds))
// u1: (v1, p1, (a1)), (v2, p2, (a3))
// u2: (v3, p1, (a1)), (v4, p3, (a5))
- produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
- produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
- produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
- produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+ produceMessage(input1, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+ produceMessage(input1, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+ produceMessage(input1, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+ produceMessage(input1, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+ produceMessage(input2, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
+ produceMessage(input2, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
+ produceMessage(input2, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
+ produceMessage(input2, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+ }
+
+ @Test
+ public void testRepartitionJoinWindowAppWithoutDeletionOnCommit() throws Exception {
+ String inputTopicName1 = "page-views";
+ String inputTopicName2 = "ad-clicks";
+ String outputTopicName = "user-ad-click-counts";
+
+ KafkaSystemAdmin.deleteMessagesCalled_$eq(false);
- produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
- produceMessage(AD_CLICKS, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
- produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
- produceMessage(AD_CLICKS, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+ initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
+ // run the application
+ RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
+ String appName = "UserPageAdClickCounter";
+ Map<String, String> configs = new HashMap<>();
+ configs.put("systems.kafka.samza.delete.committed.messages", "false");
+ configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, inputTopicName1);
+ configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2);
+ configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName);
+
+ runApplication(app, appName, configs);
+
+ // consume and validate result
+ List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2);
+ Assert.assertEquals(2, messages.size());
+
+ Assert.assertFalse(KafkaSystemAdmin.deleteMessagesCalled());
}
@Test
- public void testRepartitionJoinWindowApp() throws Exception {
+ public void testRepartitionJoinWindowAppAndDeleteMessagesOnCommit() throws Exception {
+ String inputTopicName1 = "page-views2";
+ String inputTopicName2 = "ad-clicks2";
+ String outputTopicName = "user-ad-click-counts2";
+
+ initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
+
// run the application
RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
- final String appName = "UserPageAdClickCounter";
- runApplication(app, appName, null);
+ final String appName = "UserPageAdClickCounter2";
+ Map<String, String> configs = new HashMap<>();
+ configs.put("systems.kafka.samza.delete.committed.messages", "true");
+ configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, inputTopicName1);
+ configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2);
+ configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName);
+
+ runApplication(app, appName, configs);
// consume and validate result
- List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+ List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2);
Assert.assertEquals(2, messages.size());
for (ConsumerRecord<String, String> message : messages) {
@@ -76,10 +119,38 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
Assert.assertTrue(key.equals("u1") || key.equals("u2"));
Assert.assertEquals("2", value);
}
+
+ // Verify that messages in the intermediate stream will be deleted in 10 seconds
+ long startTimeMs = System.currentTimeMillis();
+ for (StreamSpec spec: runner.getExecutionPlan(app).getIntermediateStreams()) {
+ long remainingMessageNum = -1;
+
+ while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) {
+ remainingMessageNum = 0;
+ SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata(
+ new HashSet<>(Arrays.asList(spec.getPhysicalName())), new ExponentialSleepStrategy.Mock(3)
+ ).get(spec.getPhysicalName()).get();
+
+ for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : metadatas.getSystemStreamPartitionMetadata().entrySet()) {
+ SystemStreamPartitionMetadata metadata = entry.getValue();
+ remainingMessageNum += Long.parseLong(metadata.getUpcomingOffset()) - Long.parseLong(metadata.getOldestOffset());
+ }
+ }
+ Assert.assertEquals(0, remainingMessageNum);
+ }
+
+
}
@Test
public void testBroadcastApp() {
- runApplication(new BroadcastAssertApp(), "BroadcastTest", null);
+ String inputTopicName1 = "page-views";
+ String inputTopicName2 = "ad-clicks";
+ String outputTopicName = "user-ad-click-counts";
+ Map<String, String> configs = new HashMap<>();
+ configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
+
+ initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
+ runApplication(new BroadcastAssertApp(), "BroadcastTest", configs);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3cc2a05f/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
index b01f944..bc00305 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
@@ -20,7 +20,9 @@ package org.apache.samza.test.harness
import java.util.Properties
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestUtils, ZkUtils}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.samza.system.kafka.KafkaSystemAdmin
/**
* LinkedIn integration test harness for Kafka
@@ -57,4 +59,8 @@ abstract class AbstractIntegrationTestHarness extends AbstractKafkaServerTestHar
*/
def bootstrapServers(): String = super.bootstrapUrl
+ def createSystemAdmin(system: String): KafkaSystemAdmin = {
+ new KafkaSystemAdmin(system, bootstrapServers, connectZk = () => ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled))
+ }
+
}
\ No newline at end of file