You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/30 20:59:38 UTC

kafka git commit: KAFKA-3846: KIP-65: include timestamp in Connect record types

Repository: kafka
Updated Branches:
  refs/heads/trunk 3605ffa30 -> 44ad7b574


KAFKA-3846: KIP-65: include timestamp in Connect record types

https://cwiki.apache.org/confluence/display/KAFKA/KIP-65%3A+Expose+timestamps+to+Connect

Author: Shikhar Bhushan <sh...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1537 from shikhar/kafka-3846


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44ad7b57
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44ad7b57
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44ad7b57

Branch: refs/heads/trunk
Commit: 44ad7b574e376f958cf1d3893156d666f904cdd3
Parents: 3605ffa
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Thu Jun 30 13:59:31 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Jun 30 13:59:31 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/connector/ConnectRecord.java  | 19 +++++--
 .../apache/kafka/connect/sink/SinkRecord.java   | 21 ++++++-
 .../kafka/connect/source/SourceRecord.java      | 10 +++-
 .../kafka/connect/runtime/WorkerSinkTask.java   |  5 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |  6 +-
 .../connect/runtime/WorkerSinkTaskTest.java     | 59 +++++++++++++++++++-
 .../runtime/WorkerSinkTaskThreadedTest.java     |  9 ++-
 .../connect/runtime/WorkerSourceTaskTest.java   | 21 +++++++
 8 files changed, 133 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index 21f0944..25690a2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -36,18 +36,19 @@ public abstract class ConnectRecord {
     private final Object key;
     private final Schema valueSchema;
     private final Object value;
+    private final Long timestamp;
 
-    public ConnectRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) {
-        this(topic, kafkaPartition, null, null, valueSchema, value);
-    }
-
-    public ConnectRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) {
+    public ConnectRecord(String topic, Integer kafkaPartition,
+                         Schema keySchema, Object key,
+                         Schema valueSchema, Object value,
+                         Long timestamp) {
         this.topic = topic;
         this.kafkaPartition = kafkaPartition;
         this.keySchema = keySchema;
         this.key = key;
         this.valueSchema = valueSchema;
         this.value = value;
+        this.timestamp = timestamp;
     }
 
     public String topic() {
@@ -74,6 +75,10 @@ public abstract class ConnectRecord {
         return valueSchema;
     }
 
+    public Long timestamp() {
+        return timestamp;
+    }
+
     @Override
     public String toString() {
         return "ConnectRecord{" +
@@ -81,6 +86,7 @@ public abstract class ConnectRecord {
                 ", kafkaPartition=" + kafkaPartition +
                 ", key=" + key +
                 ", value=" + value +
+                ", timestamp=" + timestamp +
                 '}';
     }
 
@@ -105,6 +111,8 @@ public abstract class ConnectRecord {
             return false;
         if (value != null ? !value.equals(that.value) : that.value != null)
             return false;
+        if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
+            return false;
 
         return true;
     }
@@ -117,6 +125,7 @@ public abstract class ConnectRecord {
         result = 31 * result + (key != null ? key.hashCode() : 0);
         result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
         result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (value != null ? timestamp.hashCode() : 0);
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
index 0bd0f6f..fbe1bdc 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.sink;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 
@@ -25,20 +26,34 @@ import org.apache.kafka.connect.data.Schema;
  * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of
  * the record in the Kafka topic-partition in addition to the standard fields. This information
  * should be used by the SinkTask to coordinate kafkaOffset commits.
+ *
+ * It also includes the {@link TimestampType}, which may be {@link TimestampType#NO_TIMESTAMP_TYPE}, and the relevant
+ * timestamp, which may be {@code null}.
  */
 @InterfaceStability.Unstable
 public class SinkRecord extends ConnectRecord {
     private final long kafkaOffset;
+    private final TimestampType timestampType;
 
     public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) {
-        super(topic, partition, keySchema, key, valueSchema, value);
+        this(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, null, TimestampType.NO_TIMESTAMP_TYPE);
+    }
+
+    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
+                      Long timestamp, TimestampType timestampType) {
+        super(topic, partition, keySchema, key, valueSchema, value, timestamp);
         this.kafkaOffset = kafkaOffset;
+        this.timestampType = timestampType;
     }
 
     public long kafkaOffset() {
         return kafkaOffset;
     }
 
+    public TimestampType timestampType() {
+        return timestampType;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
@@ -53,13 +68,14 @@ public class SinkRecord extends ConnectRecord {
         if (kafkaOffset != that.kafkaOffset)
             return false;
 
-        return true;
+        return timestampType == that.timestampType;
     }
 
     @Override
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
+        result = 31 * result + timestampType.hashCode();
         return result;
     }
 
@@ -67,6 +83,7 @@ public class SinkRecord extends ConnectRecord {
     public String toString() {
         return "SinkRecord{" +
                 "kafkaOffset=" + kafkaOffset +
+                ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
index b2b29bf..327b67b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -64,7 +64,15 @@ public class SourceRecord extends ConnectRecord {
     public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                         String topic, Integer partition,
                         Schema keySchema, Object key, Schema valueSchema, Object value) {
-        super(topic, partition, keySchema, key, valueSchema, value);
+        this(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, null);
+    }
+
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+                        String topic, Integer partition,
+                        Schema keySchema, Object key,
+                        Schema valueSchema, Object value,
+                        Long timestamp) {
+        super(topic, partition, keySchema, key, valueSchema, value, timestamp);
         this.sourcePartition = sourcePartition;
         this.sourceOffset = sourceOffset;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1aef3bb..fbc2307 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -359,7 +360,9 @@ class WorkerSinkTask extends WorkerTask {
                     new SinkRecord(msg.topic(), msg.partition(),
                             keyAndSchema.schema(), keyAndSchema.value(),
                             valueAndSchema.schema(), valueAndSchema.value(),
-                            msg.offset())
+                            msg.offset(),
+                            (msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) ? null : msg.timestamp(),
+                            msg.timestampType())
             );
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 83d1c84..6d91b36 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -181,7 +181,7 @@ class WorkerSourceTask extends WorkerTask {
         for (final SourceRecord record : toSend) {
             byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
             byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
-            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
+            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), record.timestamp(), key, value);
             log.trace("Appending record with key {}, value {}", record.key(), record.value());
             // We need this queued first since the callback could happen immediately (even synchronously in some cases).
             // Because of this we need to be careful about handling retries -- we always save the previously attempted
@@ -211,9 +211,7 @@ class WorkerSourceTask extends WorkerTask {
                                     // user overrode these settings, the best we can do is notify them of the failure via
                                     // logging.
                                     log.error("{} failed to send record to {}: {}", id, record.topic(), e);
-                                    log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}",
-                                            record.topic(), record.kafkaPartition(), record.key(), record.value(),
-                                            record.sourceOffset(), record.sourcePartition());
+                                    log.debug("Failed record: {}", record);
                                 } else {
                                     log.trace("Wrote record successfully: topic {} partition {} offset {}",
                                             recordMetadata.topic(), recordMetadata.partition(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 6a14074..dbb3f8d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
@@ -348,6 +349,58 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testMissingTimestampPropagation() throws Exception {
+        expectInitializeTask();
+        expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
+        expectConvertMessages(1);
+
+        Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
+
+        sinkTask.put(EasyMock.capture(records));
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.poll(Long.MAX_VALUE);
+
+        SinkRecord record = records.getValue().iterator().next();
+
+        // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
+        assertEquals(null, record.timestamp());
+        assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTimestampPropagation() throws Exception {
+        final Long timestamp = System.currentTimeMillis();
+        final TimestampType timestampType = TimestampType.CREATE_TIME;
+
+        expectInitializeTask();
+        expectConsumerPoll(1, timestamp, timestampType);
+        expectConvertMessages(1);
+
+        Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
+
+        sinkTask.put(EasyMock.capture(records));
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.poll(Long.MAX_VALUE);
+
+        SinkRecord record = records.getValue().iterator().next();
+
+        assertEquals(timestamp, record.timestamp());
+        assertEquals(timestampType, record.timestampType());
+
+        PowerMock.verifyAll();
+    }
+
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
@@ -431,13 +484,17 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectConsumerPoll(final int numMessages) {
+        expectConsumerPoll(numMessages, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
+    }
+
+    private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                         List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
                         for (int i = 0; i < numMessages; i++)
-                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
                         recordsReturned += numMessages;
                         return new ConsumerRecords<>(
                                 numMessages > 0 ?

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 25dbff5..392985b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -87,6 +87,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
 
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    private static final long TIMESTAMP = 42L;
+    private static final TimestampType TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
+
     static {
         TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
         TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
@@ -161,7 +164,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
             assertEquals(1, recs.size());
             for (SinkRecord rec : recs) {
                 SinkRecord referenceSinkRecord
-                        = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
+                        = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
                 assertEquals(referenceSinkRecord, rec);
                 offset++;
             }
@@ -517,7 +520,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                                 Collections.singletonMap(
                                         new TopicPartition(TOPIC, PARTITION),
                                         Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                         )));
                         recordsReturned++;
                         return records;
@@ -546,7 +549,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                                 Collections.singletonMap(
                                         new TopicPartition(TOPIC, PARTITION),
                                         Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                         )));
                         recordsReturned++;
                         return records;

http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index ab9863c..9854f22 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -392,6 +392,27 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     @Test
+    public void testSendRecordsPropagatesTimestamp() throws Exception {
+        final Long timestamp = System.currentTimeMillis();
+
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(timestamp, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testSendRecordsRetries() throws Exception {
         createWorkerTask();