You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 21:29:33 UTC

[kafka] branch trunk updated: MINOR: improve Puncutation JavaDocs and add runtime argument check (#5895)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 80eb2c2  MINOR: improve Puncutation JavaDocs and add runtime argument check (#5895)
80eb2c2 is described below

commit 80eb2c28f6b8bef603b5281f94cb32f5dadbeb7d
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Nov 15 13:29:25 2018 -0800

    MINOR: improve Puncutation JavaDocs and add runtime argument check (#5895)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/processor/ProcessorContext.java  |  4 +-
 .../processor/internals/ProcessorContextImpl.java  |  7 +-
 .../processor/internals/ProcessorContextTest.java  | 78 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 8ec06d5..b654410 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor;
 
-import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
@@ -24,6 +23,7 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Map;
 
 /**
@@ -157,7 +157,7 @@ public interface ProcessorContext {
      *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
      * </ul>
      *
-     * @param interval the time interval between punctuations
+     * @param interval the time interval between punctuations (supported minimum is 1 millisecond)
      * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
      * @param callback a function consuming timestamps representing the current stream or system time
      * @return a handle allowing cancellation of the punctuation schedule established by this method
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 21e1c17..7c18117 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.time.Duration;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -29,6 +28,7 @@ import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.time.Duration;
 import java.util.List;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
@@ -154,6 +154,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     @Override
     @Deprecated
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+        if (interval < 1) {
+            throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
+        }
         return task.schedule(interval, type, callback);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
new file mode 100644
index 0000000..1523995
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public class ProcessorContextTest {
+    private ProcessorContext context;
+
+    @Before
+    public void prepare() {
+        final StreamsConfig streamsConfig = mock(StreamsConfig.class);
+        expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
+        expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
+        expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
+        replay(streamsConfig);
+
+        context = new ProcessorContextImpl(
+            mock(TaskId.class),
+            mock(StreamTask.class),
+            streamsConfig,
+            mock(RecordCollector.class),
+            mock(ProcessorStateManager.class),
+            mock(StreamsMetricsImpl.class),
+            mock(ThreadCache.class)
+        );
+    }
+
+    @Test
+    public void shouldNotAllowToScheduleZeroMillisecondPunctuation() {
+        try {
+            context.schedule(0, null, null);
+            fail("Should have thrown IllegalArgumentException");
+        } catch (final IllegalArgumentException expected) {
+            assertThat(expected.getMessage(), equalTo("The minimum supported scheduling interval is 1 millisecond."));
+        }
+    }
+
+    @Test
+    public void shouldNotAllowToScheduleSubMillisecondPunctuation() {
+        try {
+            context.schedule(Duration.ofNanos(999_999L), null, null);
+            fail("Should have thrown IllegalArgumentException");
+        } catch (final IllegalArgumentException expected) {
+            assertThat(expected.getMessage(), equalTo("The minimum supported scheduling interval is 1 millisecond."));
+        }
+    }
+}