You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/30 20:59:38 UTC
kafka git commit: KAFKA-3846: KIP-65: include timestamp in Connect
record types
Repository: kafka
Updated Branches:
refs/heads/trunk 3605ffa30 -> 44ad7b574
KAFKA-3846: KIP-65: include timestamp in Connect record types
https://cwiki.apache.org/confluence/display/KAFKA/KIP-65%3A+Expose+timestamps+to+Connect
Author: Shikhar Bhushan <sh...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1537 from shikhar/kafka-3846
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44ad7b57
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44ad7b57
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44ad7b57
Branch: refs/heads/trunk
Commit: 44ad7b574e376f958cf1d3893156d666f904cdd3
Parents: 3605ffa
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Thu Jun 30 13:59:31 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Jun 30 13:59:31 2016 -0700
----------------------------------------------------------------------
.../kafka/connect/connector/ConnectRecord.java | 19 +++++--
.../apache/kafka/connect/sink/SinkRecord.java | 21 ++++++-
.../kafka/connect/source/SourceRecord.java | 10 +++-
.../kafka/connect/runtime/WorkerSinkTask.java | 5 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 6 +-
.../connect/runtime/WorkerSinkTaskTest.java | 59 +++++++++++++++++++-
.../runtime/WorkerSinkTaskThreadedTest.java | 9 ++-
.../connect/runtime/WorkerSourceTaskTest.java | 21 +++++++
8 files changed, 133 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index 21f0944..25690a2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -36,18 +36,19 @@ public abstract class ConnectRecord {
private final Object key;
private final Schema valueSchema;
private final Object value;
+ private final Long timestamp;
- public ConnectRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) {
- this(topic, kafkaPartition, null, null, valueSchema, value);
- }
-
- public ConnectRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) {
+ public ConnectRecord(String topic, Integer kafkaPartition,
+ Schema keySchema, Object key,
+ Schema valueSchema, Object value,
+ Long timestamp) {
this.topic = topic;
this.kafkaPartition = kafkaPartition;
this.keySchema = keySchema;
this.key = key;
this.valueSchema = valueSchema;
this.value = value;
+ this.timestamp = timestamp;
}
public String topic() {
@@ -74,6 +75,10 @@ public abstract class ConnectRecord {
return valueSchema;
}
+ public Long timestamp() {
+ return timestamp;
+ }
+
@Override
public String toString() {
return "ConnectRecord{" +
@@ -81,6 +86,7 @@ public abstract class ConnectRecord {
", kafkaPartition=" + kafkaPartition +
", key=" + key +
", value=" + value +
+ ", timestamp=" + timestamp +
'}';
}
@@ -105,6 +111,8 @@ public abstract class ConnectRecord {
return false;
if (value != null ? !value.equals(that.value) : that.value != null)
return false;
+ if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
+ return false;
return true;
}
@@ -117,6 +125,7 @@ public abstract class ConnectRecord {
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
+ result = 31 * result + (value != null ? timestamp.hashCode() : 0);
return result;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
index 0bd0f6f..fbe1bdc 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.sink;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
@@ -25,20 +26,34 @@ import org.apache.kafka.connect.data.Schema;
* SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of
* the record in the Kafka topic-partition in addition to the standard fields. This information
* should be used by the SinkTask to coordinate kafkaOffset commits.
+ *
+ * It also includes the {@link TimestampType}, which may be {@link TimestampType#NO_TIMESTAMP_TYPE}, and the relevant
+ * timestamp, which may be {@code null}.
*/
@InterfaceStability.Unstable
public class SinkRecord extends ConnectRecord {
private final long kafkaOffset;
+ private final TimestampType timestampType;
public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) {
- super(topic, partition, keySchema, key, valueSchema, value);
+ this(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, null, TimestampType.NO_TIMESTAMP_TYPE);
+ }
+
+ public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
+ Long timestamp, TimestampType timestampType) {
+ super(topic, partition, keySchema, key, valueSchema, value, timestamp);
this.kafkaOffset = kafkaOffset;
+ this.timestampType = timestampType;
}
public long kafkaOffset() {
return kafkaOffset;
}
+ public TimestampType timestampType() {
+ return timestampType;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -53,13 +68,14 @@ public class SinkRecord extends ConnectRecord {
if (kafkaOffset != that.kafkaOffset)
return false;
- return true;
+ return timestampType == that.timestampType;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
+ result = 31 * result + timestampType.hashCode();
return result;
}
@@ -67,6 +83,7 @@ public class SinkRecord extends ConnectRecord {
public String toString() {
return "SinkRecord{" +
"kafkaOffset=" + kafkaOffset +
+ ", timestampType=" + timestampType +
"} " + super.toString();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
index b2b29bf..327b67b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -64,7 +64,15 @@ public class SourceRecord extends ConnectRecord {
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key, Schema valueSchema, Object value) {
- super(topic, partition, keySchema, key, valueSchema, value);
+ this(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, null);
+ }
+
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+ String topic, Integer partition,
+ Schema keySchema, Object key,
+ Schema valueSchema, Object value,
+ Long timestamp) {
+ super(topic, partition, keySchema, key, valueSchema, value, timestamp);
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1aef3bb..fbc2307 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -359,7 +360,9 @@ class WorkerSinkTask extends WorkerTask {
new SinkRecord(msg.topic(), msg.partition(),
keyAndSchema.schema(), keyAndSchema.value(),
valueAndSchema.schema(), valueAndSchema.value(),
- msg.offset())
+ msg.offset(),
+ (msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) ? null : msg.timestamp(),
+ msg.timestampType())
);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 83d1c84..6d91b36 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -181,7 +181,7 @@ class WorkerSourceTask extends WorkerTask {
for (final SourceRecord record : toSend) {
byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
- final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
+ final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), record.timestamp(), key, value);
log.trace("Appending record with key {}, value {}", record.key(), record.value());
// We need this queued first since the callback could happen immediately (even synchronously in some cases).
// Because of this we need to be careful about handling retries -- we always save the previously attempted
@@ -211,9 +211,7 @@ class WorkerSourceTask extends WorkerTask {
// user overrode these settings, the best we can do is notify them of the failure via
// logging.
log.error("{} failed to send record to {}: {}", id, record.topic(), e);
- log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}",
- record.topic(), record.kafkaPartition(), record.key(), record.value(),
- record.sourceOffset(), record.sourcePartition());
+ log.debug("Failed record: {}", record);
} else {
log.trace("Wrote record successfully: topic {} partition {} offset {}",
recordMetadata.topic(), recordMetadata.partition(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 6a14074..dbb3f8d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
@@ -348,6 +349,58 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testMissingTimestampPropagation() throws Exception {
+ expectInitializeTask();
+ expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
+ expectConvertMessages(1);
+
+ Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
+
+ sinkTask.put(EasyMock.capture(records));
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.poll(Long.MAX_VALUE);
+
+ SinkRecord record = records.getValue().iterator().next();
+
+ // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API
+ assertEquals(null, record.timestamp());
+ assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testTimestampPropagation() throws Exception {
+ final Long timestamp = System.currentTimeMillis();
+ final TimestampType timestampType = TimestampType.CREATE_TIME;
+
+ expectInitializeTask();
+ expectConsumerPoll(1, timestamp, timestampType);
+ expectConvertMessages(1);
+
+ Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
+
+ sinkTask.put(EasyMock.capture(records));
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.poll(Long.MAX_VALUE);
+
+ SinkRecord record = records.getValue().iterator().next();
+
+ assertEquals(timestamp, record.timestamp());
+ assertEquals(timestampType, record.timestampType());
+
+ PowerMock.verifyAll();
+ }
+
private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
@@ -431,13 +484,17 @@ public class WorkerSinkTaskTest {
}
private void expectConsumerPoll(final int numMessages) {
+ expectConsumerPoll(numMessages, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
+ }
+
+ private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numMessages; i++)
- records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
recordsReturned += numMessages;
return new ConsumerRecords<>(
numMessages > 0 ?
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 25dbff5..392985b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -87,6 +87,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
private static final Map<String, String> TASK_PROPS = new HashMap<>();
+ private static final long TIMESTAMP = 42L;
+ private static final TimestampType TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
+
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
@@ -161,7 +164,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
assertEquals(1, recs.size());
for (SinkRecord rec : recs) {
SinkRecord referenceSinkRecord
- = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
+ = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
assertEquals(referenceSinkRecord, rec);
offset++;
}
@@ -517,7 +520,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
@@ -546,7 +549,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
http://git-wip-us.apache.org/repos/asf/kafka/blob/44ad7b57/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index ab9863c..9854f22 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -392,6 +392,27 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
@Test
+ public void testSendRecordsPropagatesTimestamp() throws Exception {
+ final Long timestamp = System.currentTimeMillis();
+
+ createWorkerTask();
+
+ List<SourceRecord> records = Collections.singletonList(
+ new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+ );
+
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+
+ PowerMock.replayAll();
+
+ Whitebox.setInternalState(workerTask, "toSend", records);
+ Whitebox.invokeMethod(workerTask, "sendRecords");
+ assertEquals(timestamp, sent.getValue().timestamp());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testSendRecordsRetries() throws Exception {
createWorkerTask();