You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/16 23:51:03 UTC

kafka git commit: MINOR: a few web doc and javadoc fixes

Repository: kafka
Updated Branches:
  refs/heads/trunk c2d9a2f30 -> ef4914520


MINOR: a few web doc and javadoc fixes

1. Added missing Javadocs in public interfaces.
2. Added missing upgrade web docs.
3. Minor improvements on exception messages.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Antony Stubbs <an...@gmail.com>

Closes #4071 from guozhangwang/KMinor-javadoc-gaps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef491452
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef491452
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef491452

Branch: refs/heads/trunk
Commit: ef4914520019e941827dac8eda6000a82cb74cc5
Parents: c2d9a2f
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Oct 16 16:50:59 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 16 16:50:59 2017 -0700

----------------------------------------------------------------------
 docs/streams/upgrade-guide.html                 | 22 ++++++++++++++------
 .../org/apache/kafka/streams/KafkaStreams.java  |  9 +++++---
 .../kafka/streams/processor/Cancellable.java    |  9 +++++++-
 .../kafka/streams/processor/Punctuator.java     |  8 ++++++-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  6 +++---
 5 files changed, 40 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b7bf19a..2974058 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -103,21 +103,31 @@
     </ul>
 
     <p>
-        Deprecated methods in <code>KafkaStreams</code>:
+        Deprecated / modified methods in <code>KafkaStreams</code>:
     </p>
     <ul>
-        <li><code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information.
+        <li>
+            <code>toString()</code>, <code>toString(final String indent)</code> were previously used to return static and runtime information.
             They have been deprecated in favor of using the new classes/methods <code>#localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and
             <code>TopologyDescription</code> / <code>Topology#describe()</code> (returning static information).
         </li>
-        <li>With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
+        <li>
+            With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
             you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
             If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
         </li>
         <li>
-            Windowed aggregations have moved from <code>KGroupedStream</code> to <code>WindowedKStream</code>.
+            <code>setStateListener()</code> now can only be set before the application start running, i.e. before <code>KafkaStreams.start()</code> is called.
+        </li>
+    </ul>
+
+    <p>
+        Deprecated methods in <code>KGroupedStream</code>
+    </p>
+    <ul>
+        <li>
+            Windowed aggregations have been deprecated from <code>KGroupedStream</code> and moved to <code>WindowedKStream</code>.
             You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
-            Note: the previous aggregate functions on <code>KGroupedStream</code> still work, but have been deprecated.
         </li>
     </ul>
 
@@ -216,7 +226,7 @@
         <li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to iterate over the keys of a <code>KTable</code>. </li>
     </ul>
     <p>
-        If you want to view the changelog stream of the  <code>KTable</code> then you could call <code>KTable.toStream().print()</code>.
+        If you want to view the changelog stream of the <code>KTable</code> then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>.
     </p>
 
     <p> Metrics using exactly-once semantics: </p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f5eb0a0..ae4ef34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -336,7 +336,8 @@ public class KafkaStreams {
         if (state == State.CREATED) {
             stateListener = listener;
         } else {
-            throw new IllegalStateException("Can only set StateListener in CREATED state.");
+            throw new IllegalStateException("Can only set StateListener in CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 
@@ -357,7 +358,8 @@ public class KafkaStreams {
                 globalStreamThread.setUncaughtExceptionHandler(eh);
             }
         } else {
-            throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state.");
+            throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 
@@ -372,7 +374,8 @@ public class KafkaStreams {
         if (state == State.CREATED) {
             this.globalStateRestoreListener = globalStateRestoreListener;
         } else {
-            throw new IllegalStateException("Can only set the GlobalRestoreListener in the CREATED state");
+            throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
index 82c9edd..2e56b56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
@@ -16,8 +16,15 @@
  */
 package org.apache.kafka.streams.processor;
 
+/**
+ * Cancellable interface returned in {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}.
+ *
+ * @see Punctuator
+ */
 public interface Cancellable {
 
+    /**
+     * Cancel the scheduled operation to avoid future calls.
+     */
     void cancel();
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index 200c1af..407270f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -18,9 +18,15 @@ package org.apache.kafka.streams.processor;
 
 /**
  * A functional interface used as an argument to {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}.
+ *
+ * @see Cancellable
  */
 public interface Punctuator {
 
+    /**
+     * Perform the scheduled periodic operation.
+     *
+     * @param timestamp when the operation is being called, depending on {@link PunctuationType}
+     */
     void punctuate(long timestamp);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef491452/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 4bd2890..69b4584 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -266,7 +266,7 @@ public class KafkaStreamsTest {
             streams.setGlobalStateRestoreListener(new MockStateRestoreListener());
             fail("Should throw an IllegalStateException");
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Can only set the GlobalRestoreListener in the CREATED state", e.getMessage());
+            // expected
         } finally {
             streams.close();
         }
@@ -279,7 +279,7 @@ public class KafkaStreamsTest {
             streams.setUncaughtExceptionHandler(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Can only set UncaughtExceptionHandler in CREATED state.", e.getMessage());
+            // expected
         } finally {
             streams.close();
         }
@@ -292,7 +292,7 @@ public class KafkaStreamsTest {
             streams.setStateListener(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Can only set StateListener in CREATED state.", e.getMessage());
+            // expected
         } finally {
             streams.close();
         }