You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/17 18:41:53 UTC
[1/2] kafka git commit: KAFKA-3411: Streams: stop using "job"
terminology, rename job.id to application.id
Repository: kafka
Updated Branches:
refs/heads/trunk 9a836d015 -> 958e10c87
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8ff72bc..b3b6537 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -37,7 +37,7 @@ import java.util.Set;
public abstract class AbstractTask {
protected final TaskId id;
- protected final String jobId;
+ protected final String applicationId;
protected final ProcessorTopology topology;
protected final Consumer consumer;
protected final ProcessorStateManager stateMgr;
@@ -45,7 +45,7 @@ public abstract class AbstractTask {
protected ProcessorContext processorContext;
protected AbstractTask(TaskId id,
- String jobId,
+ String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
@@ -53,17 +53,17 @@ public abstract class AbstractTask {
StreamsConfig config,
boolean isStandby) {
this.id = id;
- this.jobId = jobId;
+ this.applicationId = applicationId;
this.partitions = new HashSet<>(partitions);
this.topology = topology;
this.consumer = consumer;
// create the processor state manager
try {
- File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
- File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString());
+ File applicationStateDir = StreamThread.makeStateDir(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+ File stateFile = new File(applicationStateDir.getCanonicalPath(), id.toString());
// if partitions is null, this is a standby task
- this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
+ this.stateMgr = new ProcessorStateManager(applicationId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
throw new ProcessorStateException("Error while creating the state manager", e);
}
@@ -83,8 +83,8 @@ public abstract class AbstractTask {
return id;
}
- public final String jobId() {
- return jobId;
+ public final String applicationId() {
+ return applicationId;
}
public final Set<TopicPartition> partitions() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c4acc01..f6e43d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -27,15 +27,10 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
- private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
-
private final TaskId id;
private final StreamTask task;
private final StreamsMetrics metrics;
@@ -84,8 +79,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
}
@Override
- public String jobId() {
- return task.jobId();
+ public String applicationId() {
+ return task.applicationId();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index c8f289e..df8516c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -51,7 +51,7 @@ public class ProcessorStateManager {
public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
public static final String LOCK_FILE_NAME = ".lock";
- private final String jobId;
+ private final String applicationId;
private final int defaultPartition;
private final Map<String, TopicPartition> partitionForTopic;
private final File baseDir;
@@ -65,8 +65,8 @@ public class ProcessorStateManager {
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
- public ProcessorStateManager(String jobId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
- this.jobId = jobId;
+ public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+ this.applicationId = applicationId;
this.defaultPartition = defaultPartition;
this.partitionForTopic = new HashMap<>();
for (TopicPartition source : sources) {
@@ -104,8 +104,8 @@ public class ProcessorStateManager {
}
}
- public static String storeChangelogTopic(String jobId, String storeName) {
- return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
+ public static String storeChangelogTopic(String applicationId, String storeName) {
+ return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}
public static FileLock lockStateDirectory(File stateDir) throws IOException {
@@ -154,7 +154,7 @@ public class ProcessorStateManager {
// check that the underlying change log topic exist or not
String topic;
if (loggingEnabled)
- topic = storeChangelogTopic(this.jobId, store.name());
+ topic = storeChangelogTopic(this.applicationId, store.name());
else topic = store.name();
// block until the partition is ready for this state changelog topic or time has elapsed
@@ -325,7 +325,7 @@ public class ProcessorStateManager {
for (String storeName : stores.keySet()) {
TopicPartition part;
if (loggingEnabled.contains(storeName))
- part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
+ part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
else
part = new TopicPartition(storeName, getPartition(storeName));
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 82633b4..0bcae18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -25,17 +25,13 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier {
- private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
-
private final TaskId id;
- private final String jobId;
+ private final String applicationId;
private final StreamsMetrics metrics;
private final ProcessorStateManager stateMgr;
@@ -47,12 +43,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private boolean initialized;
public StandbyContextImpl(TaskId id,
- String jobId,
+ String applicationId,
StreamsConfig config,
ProcessorStateManager stateMgr,
StreamsMetrics metrics) {
this.id = id;
- this.jobId = jobId;
+ this.applicationId = applicationId;
this.metrics = metrics;
this.stateMgr = stateMgr;
@@ -78,8 +74,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
}
@Override
- public String jobId() {
- return jobId;
+ public String applicationId() {
+ return applicationId;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index da454cb..f19d5a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -36,15 +34,13 @@ import java.util.Map;
*/
public class StandbyTask extends AbstractTask {
- private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
-
private final Map<TopicPartition, Long> checkpointedOffsets;
/**
* Create {@link StandbyTask} with its assigned partitions
*
* @param id the ID of this task
- * @param jobId the ID of the job
+ * @param applicationId the ID of the stream processing application
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
@@ -53,17 +49,17 @@ public class StandbyTask extends AbstractTask {
* @param metrics the {@link StreamsMetrics} created by the thread
*/
public StandbyTask(TaskId id,
- String jobId,
+ String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
StreamsConfig config,
StreamsMetrics metrics) {
- super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
+ super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
- this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics);
+ this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
initializeStateStores();
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 266df3e..a6b82af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
streamThread = (StreamThread) o;
streamThread.partitionAssignor(this);
- this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId);
+ this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
@@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
/* For Test Only */
public Set<TaskId> tasksForState(String stateName) {
- return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName));
+ return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName));
}
public Set<TaskId> tasksForPartition(TopicPartition partition) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4d66324..54a25c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -61,7 +61,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
* Create {@link StreamTask} with its assigned partitions
*
* @param id the ID of this task
- * @param jobId the ID of the job
+ * @param applicationId the ID of the stream processing application
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
@@ -71,7 +71,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
* @param metrics the {@link StreamsMetrics} created by the thread
*/
public StreamTask(TaskId id,
- String jobId,
+ String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
@@ -79,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
Consumer<byte[], byte[]> restoreConsumer,
StreamsConfig config,
StreamsMetrics metrics) {
- super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
+ super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e9343e0..491c812 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -72,7 +72,7 @@ public class StreamThread extends Thread {
private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
public final PartitionGrouper partitionGrouper;
- public final String jobId;
+ public final String applicationId;
public final String clientId;
public final UUID processId;
@@ -106,12 +106,12 @@ public class StreamThread extends Thread {
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
private boolean processStandbyRecords = false;
- static File makeStateDir(String jobId, String baseDirName) {
+ static File makeStateDir(String applicationId, String baseDirName) {
File baseDir = new File(baseDirName);
if (!baseDir.exists())
baseDir.mkdir();
- File stateDir = new File(baseDir, jobId);
+ File stateDir = new File(baseDir, applicationId);
if (!stateDir.exists())
stateDir.mkdir();
@@ -150,12 +150,12 @@ public class StreamThread extends Thread {
public StreamThread(TopologyBuilder builder,
StreamsConfig config,
- String jobId,
+ String applicationId,
String clientId,
UUID processId,
Metrics metrics,
Time time) {
- this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
+ this(builder, config, null , null, null, applicationId, clientId, processId, metrics, time);
}
StreamThread(TopologyBuilder builder,
@@ -163,17 +163,17 @@ public class StreamThread extends Thread {
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
- String jobId,
+ String applicationId,
String clientId,
UUID processId,
Metrics metrics,
Time time) {
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
- this.jobId = jobId;
+ this.applicationId = applicationId;
this.config = config;
this.builder = builder;
- this.sourceTopics = builder.sourceTopics(jobId);
+ this.sourceTopics = builder.sourceTopics(applicationId);
this.clientId = clientId;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -194,7 +194,7 @@ public class StreamThread extends Thread {
this.standbyRecords = new HashMap<>();
// read in task specific config values
- this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
+ this.stateDir = makeStateDir(this.applicationId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -224,7 +224,7 @@ public class StreamThread extends Thread {
private Consumer<byte[], byte[]> createConsumer() {
String threadName = this.getName();
log.info("Creating consumer client for stream thread [" + threadName + "]");
- return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName),
+ return new KafkaConsumer<>(config.getConsumerConfigs(this, this.applicationId, this.clientId + "-" + threadName),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
@@ -580,9 +580,9 @@ public class StreamThread extends Thread {
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
- ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
+ ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
- return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
+ return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -650,10 +650,10 @@ public class StreamThread extends Thread {
protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
- ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
+ ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
if (!topology.stateStoreSuppliers().isEmpty()) {
- return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
+ return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index aac4d85..4229f94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -57,7 +57,7 @@ public class StoreChangeLogger<K, V> {
}
protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
- this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName);
+ this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
this.context = context;
this.partition = partition;
this.serialization = serialization;
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f0276ab..83ebe48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -41,7 +41,7 @@ public class StreamsConfigTest {
@Before
public void setUp() {
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test");
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -59,9 +59,10 @@ public class StreamsConfigTest {
@Test
public void testGetConsumerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+ Map<String, Object> returnedProps =
+ streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-application", "client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
- assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
+ assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 14cb493..1d0a969 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -183,12 +183,12 @@ public class ProcessorStateManagerTest {
}
private final Set<TopicPartition> noPartitions = Collections.emptySet();
- private final String jobId = "test-job";
+ private final String applicationId = "test-application";
private final String stateDir = "test";
private final String persistentStoreName = "persistentStore";
private final String nonPersistentStoreName = "nonPersistentStore";
- private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName);
- private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName);
+ private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
+ private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
@Test
public void testLockStateDirectory() throws IOException {
@@ -197,7 +197,7 @@ public class ProcessorStateManagerTest {
FileLock lock;
// the state manager locks the directory
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
try {
// this should not get the lock
@@ -226,7 +226,7 @@ public class ProcessorStateManagerTest {
try {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
try {
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
} finally {
@@ -258,7 +258,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
@@ -311,7 +311,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
@@ -351,9 +351,9 @@ public class ProcessorStateManagerTest {
String storeName2 = "store2";
String storeName3 = "store3";
- String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
- String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
- String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(jobId, storeName3);
+ String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+ String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+ String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
@@ -386,7 +386,7 @@ public class ProcessorStateManagerTest {
// if there is an source partition, inherit the partition id
Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
try {
restoreConsumer.reset();
@@ -425,7 +425,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
try {
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
@@ -462,12 +462,12 @@ public class ProcessorStateManagerTest {
HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
- ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L);
+ ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
try {
// make sure the checkpoint file is deleted
assertFalse(checkpointFile.exists());
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index c8115b8..12210cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -67,7 +67,7 @@ public class ProcessorTopologyTest {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
File localState = StateTestUtils.tempDir();
Properties props = new Properties();
- props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 295f0dd..21bdaff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -57,11 +57,11 @@ public class StandbyTaskTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
- private final String jobId = "test-job";
+ private final String applicationId = "test-application";
private final String storeName1 = "store1";
private final String storeName2 = "store2";
- private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
- private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
+ private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+ private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
@@ -94,7 +94,7 @@ public class StandbyTaskTest {
setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
+ setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
@@ -133,7 +133,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamsConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+ StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
@@ -148,7 +148,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamsConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+ StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -167,7 +167,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamsConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+ StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -201,7 +201,7 @@ public class StandbyTaskTest {
task.close();
- File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
+ File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
Map<TopicPartition, Long> offsets = checkpoint.read();
@@ -230,7 +230,7 @@ public class StandbyTaskTest {
));
StreamsConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
+ StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -299,7 +299,7 @@ public class StandbyTaskTest {
task.close();
- File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
+ File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
Map<TopicPartition, Long> offsets = checkpoint.read();
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 7f37bda..a5990bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -94,7 +94,7 @@ public class StreamPartitionAssignorTest {
setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test");
+ setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1f401db..f2ade6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -78,7 +78,7 @@ public class StreamTaskTest {
setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test");
+ setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
@@ -105,7 +105,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamsConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+ StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -156,7 +156,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamsConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+ StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index eaaf842..b201c07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -60,7 +60,7 @@ import java.util.UUID;
public class StreamThreadTest {
private final String clientId = "clientId";
- private final String jobId = "stream-thread-test";
+ private final String applicationId = "stream-thread-test";
private final UUID processId = UUID.randomUUID();
private TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -118,7 +118,7 @@ public class StreamThreadTest {
setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
+ setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
}
@@ -129,14 +129,14 @@ public class StreamThreadTest {
public boolean committed = false;
public TestStreamTask(TaskId id,
- String jobId,
+ String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
StreamsConfig config) {
- super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null);
+ super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null);
}
@Override
@@ -168,11 +168,11 @@ public class StreamThreadTest {
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build("X", id.topicGroupId);
- return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+ return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@@ -271,12 +271,12 @@ public class StreamThreadTest {
StreamsConfig config = new StreamsConfig(props);
- File jobDir = new File(baseDir, jobId);
- jobDir.mkdir();
- File stateDir1 = new File(jobDir, task1.toString());
- File stateDir2 = new File(jobDir, task2.toString());
- File stateDir3 = new File(jobDir, task3.toString());
- File extraDir = new File(jobDir, "X");
+ File applicationDir = new File(baseDir, applicationId);
+ applicationDir.mkdir();
+ File stateDir1 = new File(applicationDir, task1.toString());
+ File stateDir2 = new File(applicationDir, task2.toString());
+ File stateDir3 = new File(applicationDir, task3.toString());
+ File extraDir = new File(applicationDir, "X");
stateDir1.mkdir();
stateDir2.mkdir();
stateDir3.mkdir();
@@ -290,7 +290,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) {
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), mockTime) {
@Override
public void maybeClean() {
super.maybeClean();
@@ -299,7 +299,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build("X", id.topicGroupId);
- return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+ return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@@ -412,7 +412,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) {
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), mockTime) {
@Override
public void maybeCommit() {
super.maybeCommit();
@@ -421,7 +421,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build("X", id.topicGroupId);
- return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+ return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@@ -482,7 +482,7 @@ public class StreamThreadTest {
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
+ partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
Map<String, PartitionAssignor.Assignment> assignments =
partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 6cb45f3..063eafe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -81,7 +81,7 @@ public class SmokeTestClient extends SmokeTestUtil {
private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest");
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index d597fd2..b463669 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -106,8 +106,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
- public String jobId() {
- return "mockJob";
+ public String applicationId() {
+ return "mockApplication";
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index cf17dbe..a2948a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -126,7 +126,7 @@ public class ProcessorTopologyTestDriver {
private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
- private final String jobId = "test-driver-job";
+ private final String applicationId = "test-driver-application";
private final TaskId id;
private final ProcessorTopology topology;
@@ -167,7 +167,7 @@ public class ProcessorTopologyTestDriver {
}
task = new StreamTask(id,
- jobId,
+ applicationId,
partitionsByTopic.values(),
topology,
consumer,
@@ -334,7 +334,7 @@ public class ProcessorTopologyTestDriver {
};
// For each store name ...
for (String storeName : storeNames) {
- String topicName = ProcessorStateManager.storeChangelogTopic(jobId, storeName);
+ String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
// Set up the restore-state topic ...
// consumer.subscribe(new TopicPartition(topicName, 1));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
[2/2] kafka git commit: KAFKA-3411: Streams: stop using "job"
terminology, rename job.id to application.id
Posted by gu...@apache.org.
KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id
guozhangwang ymatsuda : please review.
Author: Michael G. Noll <mi...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1081 from miguno/KAFKA-3411
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/958e10c8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/958e10c8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/958e10c8
Branch: refs/heads/trunk
Commit: 958e10c87ce293c3bf59bb9840eaaae915eff25e
Parents: 9a836d0
Author: Michael G. Noll <mi...@confluent.io>
Authored: Thu Mar 17 10:41:48 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 17 10:41:48 2016 -0700
----------------------------------------------------------------------
.../pageview/JsonTimestampExtractor.java | 8 +-
.../examples/pageview/PageViewTypedDemo.java | 180 ++++++++++++++++++
.../examples/pageview/PageViewTypedJob.java | 184 -------------------
.../examples/pageview/PageViewUntypedDemo.java | 136 ++++++++++++++
.../examples/pageview/PageViewUntypedJob.java | 140 --------------
.../kafka/streams/examples/pipe/PipeDemo.java | 65 +++++++
.../kafka/streams/examples/pipe/PipeJob.java | 65 -------
.../examples/wordcount/WordCountDemo.java | 96 ++++++++++
.../examples/wordcount/WordCountJob.java | 96 ----------
.../wordcount/WordCountProcessorDemo.java | 137 ++++++++++++++
.../wordcount/WordCountProcessorJob.java | 137 --------------
.../org/apache/kafka/streams/KafkaStreams.java | 12 +-
.../org/apache/kafka/streams/StreamsConfig.java | 14 +-
.../streams/processor/PartitionGrouper.java | 8 +-
.../streams/processor/ProcessorContext.java | 6 +-
.../streams/processor/TopologyBuilder.java | 42 ++---
.../processor/internals/AbstractTask.java | 16 +-
.../internals/ProcessorContextImpl.java | 9 +-
.../internals/ProcessorStateManager.java | 14 +-
.../processor/internals/StandbyContextImpl.java | 14 +-
.../processor/internals/StandbyTask.java | 12 +-
.../internals/StreamPartitionAssignor.java | 4 +-
.../streams/processor/internals/StreamTask.java | 6 +-
.../processor/internals/StreamThread.java | 28 +--
.../state/internals/StoreChangeLogger.java | 2 +-
.../apache/kafka/streams/StreamsConfigTest.java | 7 +-
.../internals/ProcessorStateManagerTest.java | 28 +--
.../internals/ProcessorTopologyTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 20 +-
.../internals/StreamPartitionAssignorTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 6 +-
.../processor/internals/StreamThreadTest.java | 34 ++--
.../streams/smoketest/SmokeTestClient.java | 2 +-
.../apache/kafka/test/MockProcessorContext.java | 4 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 6 +-
35 files changed, 762 insertions(+), 780 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 6443193..63e8377 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -29,12 +29,12 @@ public class JsonTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
- if (record.value() instanceof PageViewTypedJob.PageView) {
- return ((PageViewTypedJob.PageView) record.value()).timestamp;
+ if (record.value() instanceof PageViewTypedDemo.PageView) {
+ return ((PageViewTypedDemo.PageView) record.value()).timestamp;
}
- if (record.value() instanceof PageViewTypedJob.UserProfile) {
- return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
+ if (record.value() instanceof PageViewTypedDemo.UserProfile) {
+ return ((PageViewTypedDemo.UserProfile) record.value()).timestamp;
}
if (record.value() instanceof JsonNode) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
new file mode 100644
index 0000000..4f9de29
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewTypedDemo {
+
+ // POJO classes
+ static public class PageView {
+ public String user;
+ public String page;
+ public Long timestamp;
+ }
+
+ static public class UserProfile {
+ public String region;
+ public Long timestamp;
+ }
+
+ static public class PageViewByRegion {
+ public String user;
+ public String page;
+ public String region;
+ }
+
+ static public class WindowedPageViewByRegion {
+ public long windowStart;
+ public String region;
+ }
+
+ static public class RegionCount {
+ public long count;
+ public String region;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+ // TODO: the following can be removed with a serialization factory
+ Map<String, Object> serdeProps = new HashMap<>();
+
+ final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
+ serdeProps.put("JsonPOJOClass", PageView.class);
+ pageViewDeserializer.configure(serdeProps, false);
+
+ final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+ serdeProps.put("JsonPOJOClass", UserProfile.class);
+ userProfileDeserializer.configure(serdeProps, false);
+
+ final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", UserProfile.class);
+ userProfileSerializer.configure(serdeProps, false);
+
+ final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+ wPageViewByRegionSerializer.configure(serdeProps, false);
+
+ final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", RegionCount.class);
+ regionCountSerializer.configure(serdeProps, false);
+
+ KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
+
+ KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+
+ KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
+ .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+ @Override
+ public PageViewByRegion apply(PageView view, UserProfile profile) {
+ PageViewByRegion viewByRegion = new PageViewByRegion();
+ viewByRegion.user = view.user;
+ viewByRegion.page = view.page;
+
+ if (profile != null) {
+ viewByRegion.region = profile.region;
+ } else {
+ viewByRegion.region = "UNKNOWN";
+ }
+ return viewByRegion;
+ }
+ })
+ .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+ @Override
+ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+ return new KeyValue<>(viewRegion.region, viewRegion);
+ }
+ })
+ .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+ stringSerializer, stringDeserializer)
+ // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
+ @Override
+ public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
+ WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
+ wViewByRegion.windowStart = key.window().start();
+ wViewByRegion.region = key.value();
+
+ RegionCount rCount = new RegionCount();
+ rCount.region = key.value();
+ rCount.count = value;
+
+ return new KeyValue<>(wViewByRegion, rCount);
+ }
+ });
+
+ // write to the result topic
+ regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
+
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+
+ // usually the stream application would be running forever,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
deleted file mode 100644
index 1fcb403..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewTypedJob {
-
- // POJO classes
- static public class PageView {
- public String user;
- public String page;
- public Long timestamp;
- }
-
- static public class UserProfile {
- public String region;
- public Long timestamp;
- }
-
- static public class PageViewByRegion {
- public String user;
- public String page;
- public String region;
- }
-
- static public class WindowedPageViewByRegion {
- public long windowStart;
- public String region;
- }
-
- static public class RegionCount {
- public long count;
- public String region;
- }
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
- // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
- props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- KStreamBuilder builder = new KStreamBuilder();
-
- final Serializer<String> stringSerializer = new StringSerializer();
- final Deserializer<String> stringDeserializer = new StringDeserializer();
- final Serializer<Long> longSerializer = new LongSerializer();
- final Deserializer<Long> longDeserializer = new LongDeserializer();
-
- // TODO: the following can be removed with a serialization factory
- Map<String, Object> serdeProps = new HashMap<>();
-
- final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", PageView.class);
- pageViewDeserializer.configure(serdeProps, false);
-
- final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", UserProfile.class);
- userProfileDeserializer.configure(serdeProps, false);
-
- final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", UserProfile.class);
- userProfileSerializer.configure(serdeProps, false);
-
- final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
- wPageViewByRegionSerializer.configure(serdeProps, false);
-
- final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", RegionCount.class);
- regionCountSerializer.configure(serdeProps, false);
-
- KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
-
- KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
-
- KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
- .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
- @Override
- public PageViewByRegion apply(PageView view, UserProfile profile) {
- PageViewByRegion viewByRegion = new PageViewByRegion();
- viewByRegion.user = view.user;
- viewByRegion.page = view.page;
-
- if (profile != null) {
- viewByRegion.region = profile.region;
- } else {
- viewByRegion.region = "UNKNOWN";
- }
- return viewByRegion;
- }
- })
- .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
- @Override
- public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
- return new KeyValue<>(viewRegion.region, viewRegion);
- }
- })
- .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
- stringSerializer, stringDeserializer)
- // TODO: we can merge ths toStream().map(...) with a single toStream(...)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
- @Override
- public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
- WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
- wViewByRegion.windowStart = key.window().start();
- wViewByRegion.region = key.value();
-
- RegionCount rCount = new RegionCount();
- rCount.region = key.value();
- rCount.count = value;
-
- return new KeyValue<>(wViewByRegion, rCount);
- }
- });
-
- // write to the result topic
- regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
-
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
-
- // usually the streaming job would be ever running,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
new file mode 100644
index 0000000..9377095
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewUntypedDemo {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+ final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
+ final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+
+ KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+
+ KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+
+ KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
+ @Override
+ public String apply(JsonNode record) {
+ return record.get("region").textValue();
+ }
+ });
+
+ KStream<JsonNode, JsonNode> regionCount = views
+ .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+ @Override
+ public JsonNode apply(JsonNode view, String region) {
+ ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+ return jNode.put("user", view.get("user").textValue())
+ .put("page", view.get("page").textValue())
+ .put("region", region == null ? "UNKNOWN" : region);
+ }
+ })
+ .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+ @Override
+ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+ return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+ }
+ })
+ .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
+ stringSerializer, stringDeserializer)
+ // TODO: we can merge ths toStream().map(...) with a single toStream(...)
+ .toStream()
+ .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
+ @Override
+ public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
+ ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
+ keyNode.put("window-start", key.window().start())
+ .put("region", key.value());
+
+ ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
+ valueNode.put("count", value);
+
+ return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
+ }
+ });
+
+ // write to the result topic
+ regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
+
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+
+ // usually the stream application would be running forever,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
deleted file mode 100644
index fb1a55d..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pageview;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
-import org.apache.kafka.connect.json.JsonDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-/**
- * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
- * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
- * in Kafka Streams.
- *
- * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
- * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
- * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PageViewUntypedJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
-
- // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
- props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- KStreamBuilder builder = new KStreamBuilder();
-
- final Serializer<String> stringSerializer = new StringSerializer();
- final Deserializer<String> stringDeserializer = new StringDeserializer();
- final Serializer<Long> longSerializer = new LongSerializer();
- final Deserializer<Long> longDeserializer = new LongDeserializer();
- final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
- final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
-
- KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
-
- KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
-
- KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
- @Override
- public String apply(JsonNode record) {
- return record.get("region").textValue();
- }
- });
-
- KStream<JsonNode, JsonNode> regionCount = views
- .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
- @Override
- public JsonNode apply(JsonNode view, String region) {
- ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
- return jNode.put("user", view.get("user").textValue())
- .put("page", view.get("page").textValue())
- .put("region", region == null ? "UNKNOWN" : region);
- }
- })
- .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
- @Override
- public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
- return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
- }
- })
- .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
- stringSerializer, stringDeserializer)
- // TODO: we can merge ths toStream().map(...) with a single toStream(...)
- .toStream()
- .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
- @Override
- public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
- ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
- keyNode.put("window-start", key.window().start())
- .put("region", key.value());
-
- ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
- valueNode.put("count", value);
-
- return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
- }
- });
-
- // write to the result topic
- regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
-
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
-
- // usually the streaming job would be ever running,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
new file mode 100644
index 0000000..c37c68a
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pipe;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
+ * write data to a sink (output) topic.
+ *
+ * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
+ * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PipeDemo {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ builder.stream("streams-file-input").to("streams-pipe-output");
+
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+
+ // usually the stream application would be running forever,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
deleted file mode 100644
index 8885ca2..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.pipe;
-
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
- * write data to a sink (output) topic.
- *
- * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
- * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class PipeJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
- props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- KStreamBuilder builder = new KStreamBuilder();
-
- builder.stream("streams-file-input").to("streams-pipe-output");
-
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
-
- // usually the streaming job would be ever running,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
new file mode 100644
index 0000000..03d5142
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountDemo {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+ final Serializer<Long> longSerializer = new LongSerializer();
+
+ KStream<String, String> source = builder.stream("streams-file-input");
+
+ KTable<String, Long> counts = source
+ .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(String value) {
+ return Arrays.asList(value.toLowerCase().split(" "));
+ }
+ }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+ @Override
+ public KeyValue<String, String> apply(String key, String value) {
+ return new KeyValue<String, String>(value, value);
+ }
+ })
+ .countByKey(stringSerializer, stringDeserializer, "Counts");
+
+ counts.to("streams-wordcount-output", stringSerializer, longSerializer);
+
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+
+ // usually the stream application would be running forever,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
deleted file mode 100644
index 2b51a44..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-/**
- * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
- props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- KStreamBuilder builder = new KStreamBuilder();
-
- final Serializer<String> stringSerializer = new StringSerializer();
- final Deserializer<String> stringDeserializer = new StringDeserializer();
- final Serializer<Long> longSerializer = new LongSerializer();
-
- KStream<String, String> source = builder.stream("streams-file-input");
-
- KTable<String, Long> counts = source
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.toLowerCase().split(" "));
- }
- }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
- @Override
- public KeyValue<String, String> apply(String key, String value) {
- return new KeyValue<String, String>(value, value);
- }
- })
- .countByKey(stringSerializer, stringDeserializer, "Counts");
-
- counts.to("streams-wordcount-output", stringSerializer, longSerializer);
-
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
-
- // usually the streaming job would be ever running,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
new file mode 100644
index 0000000..b651b3a
--- /dev/null
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.wordcount;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Properties;
+
+/**
+ * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class WordCountProcessorDemo {
+
+ private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+
+ @Override
+ public Processor<String, String> get() {
+ return new Processor<String, String>() {
+ private ProcessorContext context;
+ private KeyValueStore<String, Integer> kvStore;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void init(ProcessorContext context) {
+ this.context = context;
+ this.context.schedule(1000);
+ this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+ }
+
+ @Override
+ public void process(String dummy, String line) {
+ String[] words = line.toLowerCase().split(" ");
+
+ for (String word : words) {
+ Integer oldValue = this.kvStore.get(word);
+
+ if (oldValue == null) {
+ this.kvStore.put(word, 1);
+ } else {
+ this.kvStore.put(word, oldValue + 1);
+ }
+ }
+
+ context.commit();
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+ System.out.println("----------- " + timestamp + " ----------- ");
+
+ while (iter.hasNext()) {
+ KeyValue<String, Integer> entry = iter.next();
+
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+ context.forward(entry.key, entry.value.toString());
+ }
+
+ iter.close();
+ }
+
+ @Override
+ public void close() {
+ this.kvStore.close();
+ }
+ };
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+ props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("Source", "streams-file-input");
+
+ builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
+ builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+
+ builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
+
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+
+ // usually the stream application would be running forever,
+ // in this example we just let it run for some time and stop since the input data is finite.
+ Thread.sleep(5000L);
+
+ streams.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
deleted file mode 100644
index cb82656..0000000
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples.wordcount;
-
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-/**
- * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
- * that computes a simple word occurrence histogram from an input text.
- *
- * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
- * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
- * is an updated count of a single word.
- *
- * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
- * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
- */
-public class WordCountProcessorJob {
-
- private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
- @Override
- public Processor<String, String> get() {
- return new Processor<String, String>() {
- private ProcessorContext context;
- private KeyValueStore<String, Integer> kvStore;
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
- this.context = context;
- this.context.schedule(1000);
- this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
- }
-
- @Override
- public void process(String dummy, String line) {
- String[] words = line.toLowerCase().split(" ");
-
- for (String word : words) {
- Integer oldValue = this.kvStore.get(word);
-
- if (oldValue == null) {
- this.kvStore.put(word, 1);
- } else {
- this.kvStore.put(word, oldValue + 1);
- }
- }
-
- context.commit();
- }
-
- @Override
- public void punctuate(long timestamp) {
- KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
- System.out.println("----------- " + timestamp + " ----------- ");
-
- while (iter.hasNext()) {
- KeyValue<String, Integer> entry = iter.next();
-
- System.out.println("[" + entry.key + ", " + entry.value + "]");
-
- context.forward(entry.key, entry.value.toString());
- }
-
- iter.close();
- }
-
- @Override
- public void close() {
- this.kvStore.close();
- }
- };
- }
- }
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
- props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("Source", "streams-file-input");
-
- builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
- builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
-
- builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
-
- KafkaStreams streams = new KafkaStreams(builder, props);
- streams.start();
-
- // usually the streaming job would be ever running,
- // in this example we just let it run for some time and stop since the input data is finite.
- Thread.sleep(5000L);
-
- streams.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 15d6d8b..20958e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
* more threads specified in the configs for the processing work.
* <p>
- * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes
+ * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
* on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
* based on the assignment of the input topic partitions so that all partitions are being
* consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
@@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* A simple example might look like this:
* <pre>
* Map<String, Object> props = new HashMap<>();
- * props.put(StreamsConfig.JOB_ID_CONFIG, "my-job");
+ * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
* props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
* props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
* props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -113,12 +113,12 @@ public class KafkaStreams {
this.processId = UUID.randomUUID();
- // JobId is a required config and hence should always have value
- String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+ // The application ID is a required config and hence should always have value
+ String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
- clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+ clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
@@ -132,7 +132,7 @@ public class KafkaStreams {
this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+ this.threads[i] = new StreamThread(builder, config, applicationId, clientId, processId, metrics, time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index c4b8ffe..52fdbd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -83,13 +83,13 @@ public class StreamsConfig extends AbstractConfig {
public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
- /** <code>job.id</code> */
- public static final String JOB_ID_CONFIG = "job.id";
- public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+ /** <code>application.id</code> */
+ public static final String APPLICATION_ID_CONFIG = "application.id";
+ public static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
/** <code>replication.factor</code> */
public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
- public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the job.";
+ public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -124,10 +124,10 @@ public class StreamsConfig extends AbstractConfig {
private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
static {
- CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value
+ CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value
Type.STRING,
Importance.HIGH,
- StreamsConfig.JOB_ID_DOC)
+ StreamsConfig.APPLICATION_ID_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
Type.STRING,
Importance.HIGH,
@@ -297,7 +297,7 @@ public class StreamsConfig extends AbstractConfig {
}
private void removeStreamsSpecificConfigs(Map<String, Object> props) {
- props.remove(StreamsConfig.JOB_ID_CONFIG);
+ props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
props.remove(StreamsConfig.STATE_DIR_CONFIG);
props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index ae9844d..0c94084 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -28,7 +28,8 @@ import java.util.Set;
*
* This grouper also acts as the stream task creation function along with partition distribution
* such that each generated partition group is assigned with a distinct {@link TaskId};
- * the created task ids will then be assigned to Kafka Streams instances that host the stream job.
+ * the created task ids will then be assigned to Kafka Streams instances that host the stream
+ * processing application.
*/
public interface PartitionGrouper {
@@ -37,9 +38,10 @@ public interface PartitionGrouper {
* expected to be processed together must be in the same group. DefaultPartitionGrouper implements this
* interface. See {@link DefaultPartitionGrouper} for more information.
*
- * @param topicGroups The map from the {@link TopologyBuilder#topicGroups() topic group} id to topics
+ * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics
* @param metadata Metadata of the consuming cluster
* @return a map of task ids to groups of partitions
*/
Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata);
-}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 79376ba..e9d5252 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -29,11 +29,11 @@ import java.io.File;
public interface ProcessorContext {
/**
- * Returns the job id
+ * Returns the application id
*
- * @return the job id
+ * @return the application id
*/
- String jobId();
+ String applicationId();
/**
* Returns the task id
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 6e5aec5..ab7122b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -85,7 +85,7 @@ public class TopologyBuilder {
this.name = name;
}
- public abstract ProcessorNode build(String jobId);
+ public abstract ProcessorNode build(String applicationId);
}
private static class ProcessorNodeFactory extends NodeFactory {
@@ -105,7 +105,7 @@ public class TopologyBuilder {
@SuppressWarnings("unchecked")
@Override
- public ProcessorNode build(String jobId) {
+ public ProcessorNode build(String applicationId) {
return new ProcessorNode(name, supplier.get(), stateStoreNames);
}
}
@@ -124,7 +124,7 @@ public class TopologyBuilder {
@SuppressWarnings("unchecked")
@Override
- public ProcessorNode build(String jobId) {
+ public ProcessorNode build(String applicationId) {
return new SourceNode(name, keyDeserializer, valDeserializer);
}
}
@@ -147,10 +147,10 @@ public class TopologyBuilder {
@SuppressWarnings("unchecked")
@Override
- public ProcessorNode build(String jobId) {
+ public ProcessorNode build(String applicationId) {
if (internalTopicNames.contains(topic)) {
- // prefix the job id to the internal topic name
- return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner);
+ // prefix the internal topic name with the application id
+ return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner);
} else {
return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
}
@@ -496,7 +496,7 @@ public class TopologyBuilder {
*
* @return groups of topic names
*/
- public Map<Integer, TopicsInfo> topicGroups(String jobId) {
+ public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
if (nodeGroups == null)
@@ -514,8 +514,8 @@ public class TopologyBuilder {
// if some of the topics are internal, add them to the internal topics
for (String topic : topics) {
if (this.internalTopicNames.contains(topic)) {
- // prefix the job id to the internal topic name
- String internalTopic = jobId + "-" + topic;
+ // prefix the internal topic name with the application id
+ String internalTopic = applicationId + "-" + topic;
internalSourceTopics.add(internalTopic);
sourceTopics.add(internalTopic);
} else {
@@ -528,8 +528,8 @@ public class TopologyBuilder {
String topic = nodeToSinkTopic.get(node);
if (topic != null) {
if (internalTopicNames.contains(topic)) {
- // prefix the job id to the change log topic name
- sinkTopics.add(jobId + "-" + topic);
+ // prefix the change log topic name with the application id
+ sinkTopics.add(applicationId + "-" + topic);
} else {
sinkTopics.add(topic);
}
@@ -538,8 +538,8 @@ public class TopologyBuilder {
// if the node is connected to a state, add to the state topics
for (StateStoreFactory stateFactory : stateFactories.values()) {
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
- // prefix the job id to the change log topic name
- stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+ // prefix the change log topic name with the application id
+ stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
}
}
}
@@ -637,7 +637,7 @@ public class TopologyBuilder {
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
- public ProcessorTopology build(String jobId, Integer topicGroupId) {
+ public ProcessorTopology build(String applicationId, Integer topicGroupId) {
Set<String> nodeGroup;
if (topicGroupId != null) {
nodeGroup = nodeGroups().get(topicGroupId);
@@ -645,11 +645,11 @@ public class TopologyBuilder {
// when nodeGroup is null, we build the full topology. this is used in some tests.
nodeGroup = null;
}
- return build(jobId, nodeGroup);
+ return build(applicationId, nodeGroup);
}
@SuppressWarnings("unchecked")
- private ProcessorTopology build(String jobId, Set<String> nodeGroup) {
+ private ProcessorTopology build(String applicationId, Set<String> nodeGroup) {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -658,7 +658,7 @@ public class TopologyBuilder {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories.values()) {
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
- ProcessorNode node = factory.build(jobId);
+ ProcessorNode node = factory.build(applicationId);
processorNodes.add(node);
processorMap.put(node.name(), node);
@@ -674,8 +674,8 @@ public class TopologyBuilder {
} else if (factory instanceof SourceNodeFactory) {
for (String topic : ((SourceNodeFactory) factory).topics) {
if (internalTopicNames.contains(topic)) {
- // prefix the job id to the internal topic name
- topicSourceMap.put(jobId + "-" + topic, (SourceNode) node);
+ // prefix the internal topic name with the application id
+ topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
} else {
topicSourceMap.put(topic, (SourceNode) node);
}
@@ -697,11 +697,11 @@ public class TopologyBuilder {
* Get the names of topics that are to be consumed by the source nodes created by this builder.
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
*/
- public Set<String> sourceTopics(String jobId) {
+ public Set<String> sourceTopics(String applicationId) {
Set<String> topics = new HashSet<>();
for (String topic : sourceTopicNames) {
if (internalTopicNames.contains(topic)) {
- topics.add(jobId + "-" + topic);
+ topics.add(applicationId + "-" + topic);
} else {
topics.add(topic);
}