You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/16 23:51:03 UTC
kafka git commit: MINOR: a few web doc and javadoc fixes
Repository: kafka
Updated Branches:
refs/heads/trunk c2d9a2f30 -> ef4914520
MINOR: a few web doc and javadoc fixes
1. Added missing Javadocs in public interfaces.
2. Added missing upgrade web docs.
3. Minor improvements on exception messages.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Antony Stubbs <an...@gmail.com>
Closes #4071 from guozhangwang/KMinor-javadoc-gaps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef491452
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef491452
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef491452
Branch: refs/heads/trunk
Commit: ef4914520019e941827dac8eda6000a82cb74cc5
Parents: c2d9a2f
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Oct 16 16:50:59 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 16 16:50:59 2017 -0700
----------------------------------------------------------------------
docs/streams/upgrade-guide.html | 22 ++++++++++++++------
.../org/apache/kafka/streams/KafkaStreams.java | 9 +++++---
.../kafka/streams/processor/Cancellable.java | 9 +++++++-
.../kafka/streams/processor/Punctuator.java | 8 ++++++-
.../apache/kafka/streams/KafkaStreamsTest.java | 6 +++---
5 files changed, 40 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b7bf19a..2974058 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -103,21 +103,31 @@
</ul>
<p>
- Deprecated methods in <code>KafkaStreams</code>:
+ Deprecated / modified methods in <code>KafkaStreams</code>:
</p>
<ul>
- <li><code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information.
+ <li>
+ <code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information.
They have been deprecated in favor of using the new classes/methods <code>#localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and
<code>TopologyDescription</code> / <code>Topology#describe()</code> (returning static information).
</li>
- <li>With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
+ <li>
+ With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
</li>
<li>
- Windowed aggregations have moved from <code>KGroupedStream</code> to <code>WindowedKStream</code>.
+ <code>setStateListener()</code> now can only be set before the application start running, i.e. before <code>KafkaStreams.start()</code> is called.
+ </li>
+ </ul>
+
+ <p>
+ Deprecated methods in <code>KGroupedStream</code>
+ </p>
+ <ul>
+ <li>
+ Windowed aggregations have been deprecated from <code>KGroupedStream</code> and moved to <code>WindowedKStream</code>.
You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
- Note: the previous aggregate functions on <code>KGroupedStream</code> still work, but have been deprecated.
</li>
</ul>
@@ -216,7 +226,7 @@
<li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li>
</ul>
<p>
- If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print()</code>.
+ If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>.
</p>
<p> Metrics using exactly-once semantics: </p>
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f5eb0a0..ae4ef34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -336,7 +336,8 @@ public class KafkaStreams {
if (state == State.CREATED) {
stateListener = listener;
} else {
- throw new IllegalStateException("Can only set StateListener in CREATED state.");
+ throw new IllegalStateException("Can only set StateListener in CREATED state. " +
+ "Current state is: " + state);
}
}
@@ -357,7 +358,8 @@ public class KafkaStreams {
globalStreamThread.setUncaughtExceptionHandler(eh);
}
} else {
- throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state.");
+ throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " +
+ "Current state is: " + state);
}
}
@@ -372,7 +374,8 @@ public class KafkaStreams {
if (state == State.CREATED) {
this.globalStateRestoreListener = globalStateRestoreListener;
} else {
- throw new IllegalStateException("Can only set the GlobalRestoreListener in the CREATED state");
+ throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " +
+ "Current state is: " + state);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
index 82c9edd..2e56b56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
@@ -16,8 +16,15 @@
*/
package org.apache.kafka.streams.processor;
+/**
+ * Cancellable interface returned in {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}.
+ *
+ * @see Punctuator
+ */
public interface Cancellable {
+ /**
+ * Cancel the scheduled operation to avoid future calls.
+ */
void cancel();
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index 200c1af..407270f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -18,9 +18,15 @@ package org.apache.kafka.streams.processor;
/**
* A functional interface used as an argument to {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}.
+ *
+ * @see Cancellable
*/
public interface Punctuator {
+ /**
+ * Perform the scheduled periodic operation.
+ *
+ * @param timestamp when the operation is being called, depending on {@link PunctuationType}
+ */
void punctuate(long timestamp);
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 4bd2890..69b4584 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -266,7 +266,7 @@ public class KafkaStreamsTest {
streams.setGlobalStateRestoreListener(new MockStateRestoreListener());
fail("Should throw an IllegalStateException");
} catch (final IllegalStateException e) {
- Assert.assertEquals("Can only set the GlobalRestoreListener in the CREATED state", e.getMessage());
+ // expected
} finally {
streams.close();
}
@@ -279,7 +279,7 @@ public class KafkaStreamsTest {
streams.setUncaughtExceptionHandler(null);
fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- Assert.assertEquals("Can only set UncaughtExceptionHandler in CREATED state.", e.getMessage());
+ // expected
} finally {
streams.close();
}
@@ -292,7 +292,7 @@ public class KafkaStreamsTest {
streams.setStateListener(null);
fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- Assert.assertEquals("Can only set StateListener in CREATED state.", e.getMessage());
+ // expected
} finally {
streams.close();
}