You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/01 17:22:00 UTC
kafka git commit: HOTFIX: improve error message on invalid input
record timestamp
Repository: kafka
Updated Branches:
refs/heads/0.10.1 baae90a1e -> f3e5e6d65
HOTFIX: improve error message on invalid input record timestamp
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2079 from mjsax/hotfixTSExtractor-0.10.1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3e5e6d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3e5e6d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3e5e6d6
Branch: refs/heads/0.10.1
Commit: f3e5e6d65d00204a4f15f60b1f534a3b03048e71
Parents: baae90a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Nov 1 10:21:56 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 1 10:21:56 2016 -0700
----------------------------------------------------------------------
.../streams/processor/internals/SinkNode.java | 12 +-
.../processor/internals/SinkNodeTest.java | 145 +++++++++++++++++++
2 files changed, 156 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f3e5e6d6/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2b5692d..c330ea9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -69,7 +70,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
@Override
public void process(final K key, final V value) {
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner);
+
+ final long timestamp = context.timestamp();
+ if (timestamp < 0) {
+ throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " +
+ "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
+ "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
+ "Use a different TimestampExtractor to process this data.");
+ }
+
+ collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f3e5e6d6/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
new file mode 100644
index 0000000..3b41517
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+public class SinkNodeTest {
+
+ @Test(expected = StreamsException.class)
+ @SuppressWarnings("unchecked")
+ public void invalidInputRecordTimestampTest() {
+ final Serializer anySerializer = Serdes.Bytes().serializer();
+
+ final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+ sink.init(new MockProcessorContext());
+
+ sink.process(null, null);
+ }
+
+ private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
+ private final long invalidTimestamp = -1;
+
+ @Override
+ public String applicationId() {
+ return null;
+ }
+
+ @Override
+ public TaskId taskId() {
+ return null;
+ }
+
+ @Override
+ public Serde<?> keySerde() {
+ return null;
+ }
+
+ @Override
+ public Serde<?> valueSerde() {
+ return null;
+ }
+
+ @Override
+ public File stateDir() {
+ return null;
+ }
+
+ @Override
+ public StreamsMetrics metrics() {
+ return null;
+ }
+
+ @Override
+ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
+ }
+
+ @Override
+ public StateStore getStateStore(String name) {
+ return null;
+ }
+
+ @Override
+ public void schedule(long interval) {
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value) {
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value, int childIndex) {
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value, String childName) {
+ }
+
+ @Override
+ public void commit() {
+ }
+
+ @Override
+ public String topic() {
+ return null;
+ }
+
+ @Override
+ public int partition() {
+ return 0;
+ }
+
+ @Override
+ public long offset() {
+ return 0;
+ }
+
+ @Override
+ public long timestamp() {
+ return invalidTimestamp;
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> appConfigsWithPrefix(String prefix) {
+ return null;
+ }
+
+ @Override
+ public RecordCollector recordCollector() {
+ return null;
+ }
+ }
+
+}
\ No newline at end of file