You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:24 UTC
[05/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
deleted file mode 100644
index b270368..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConvertingFutureCallback;
-import org.apache.kafka.copycat.util.KafkaBasedLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * <p>
- * Implementation of OffsetBackingStore that uses a Kafka topic to store offset data.
- * </p>
- * <p>
- * Internally, this implementation both produces to and consumes from a Kafka topic which stores the offsets.
- * It accepts producer and consumer overrides via its configuration but forces some settings to specific values
- * to ensure correct behavior (e.g. acks, auto.offset.reset).
- * </p>
- */
-public class KafkaOffsetBackingStore implements OffsetBackingStore {
- private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
-
- public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
-
- private KafkaBasedLog<byte[], byte[]> offsetLog;
- private HashMap<ByteBuffer, ByteBuffer> data;
-
- @Override
- public void configure(Map<String, ?> configs) {
- String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
- if (topic == null)
- throw new CopycatException("Offset storage topic must be specified");
-
- data = new HashMap<>();
-
- Map<String, Object> producerProps = new HashMap<>();
- producerProps.putAll(configs);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-
- Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.putAll(configs);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
- offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
- }
-
- @Override
- public void start() {
- log.info("Starting KafkaOffsetBackingStore");
- offsetLog.start();
- log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
- }
-
- @Override
- public void stop() {
- log.info("Stopping KafkaOffsetBackingStore");
- offsetLog.stop();
- log.info("Stopped KafkaOffsetBackingStore");
- }
-
- @Override
- public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys,
- final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
- ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
- @Override
- public Map<ByteBuffer, ByteBuffer> convert(Void result) {
- Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
- for (ByteBuffer key : keys)
- values.put(key, data.get(key));
- return values;
- }
- };
- // This operation may be relatively (but not too) expensive since it always requires checking end offsets, even
- // if we've already read up to the end. However, it also should not be common (offsets should only be read when
- // resetting a task). Always requiring that we read to the end is simpler than trying to differentiate when it
- // is safe not to (which should only be if we *know* we've maintained ownership since the last write).
- offsetLog.readToEnd(future);
- return future;
- }
-
- @Override
- public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
- SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
-
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
- offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback);
-
- return producerCallback;
- }
-
- private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
- @Override
- public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
- ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
- ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
- data.put(key, value);
- }
- };
-
- private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
- Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
- }
-
- private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
- private int numLeft;
- private boolean completed = false;
- private Throwable exception = null;
- private final Callback<Void> callback;
-
- public SetCallbackFuture(int numRecords, Callback<Void> callback) {
- numLeft = numRecords;
- this.callback = callback;
- }
-
- @Override
- public synchronized void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- if (!completed) {
- this.exception = exception;
- callback.onCompletion(exception, null);
- completed = true;
- this.notify();
- }
- return;
- }
-
- numLeft -= 1;
- if (numLeft == 0) {
- callback.onCompletion(null, null);
- completed = true;
- this.notify();
- }
- }
-
- @Override
- public synchronized boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public synchronized boolean isCancelled() {
- return false;
- }
-
- @Override
- public synchronized boolean isDone() {
- return completed;
- }
-
- @Override
- public synchronized Void get() throws InterruptedException, ExecutionException {
- while (!completed) {
- this.wait();
- }
- if (exception != null)
- throw new ExecutionException(exception);
- return null;
- }
-
- @Override
- public synchronized Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- long started = System.currentTimeMillis();
- long limit = started + unit.toMillis(timeout);
- while (!completed) {
- long leftMs = limit - System.currentTimeMillis();
- if (leftMs < 0)
- throw new TimeoutException("KafkaOffsetBackingStore Future timed out.");
- this.wait(leftMs);
- }
- if (exception != null)
- throw new ExecutionException(exception);
- return null;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
deleted file mode 100644
index 11a1b89..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this
- * behaves similarly to a real backing store, operations are executed asynchronously on a
- * background thread.
- */
-public class MemoryOffsetBackingStore implements OffsetBackingStore {
- private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
-
- protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
- protected ExecutorService executor = Executors.newSingleThreadExecutor();
-
- public MemoryOffsetBackingStore() {
-
- }
-
- @Override
- public void configure(Map<String, ?> props) {
- }
-
- @Override
- public synchronized void start() {
- }
-
- @Override
- public synchronized void stop() {
- // Nothing to do since this doesn't maintain any outstanding connections/data
- }
-
- @Override
- public Future<Map<ByteBuffer, ByteBuffer>> get(
- final Collection<ByteBuffer> keys,
- final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
- return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
- @Override
- public Map<ByteBuffer, ByteBuffer> call() throws Exception {
- Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
- synchronized (MemoryOffsetBackingStore.this) {
- for (ByteBuffer key : keys) {
- result.put(key, data.get(key));
- }
- }
- if (callback != null)
- callback.onCompletion(null, result);
- return result;
- }
- });
-
- }
-
- @Override
- public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values,
- final Callback<Void> callback) {
- return executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- synchronized (MemoryOffsetBackingStore.this) {
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
- data.put(entry.getKey(), entry.getValue());
- }
- save();
- }
- if (callback != null)
- callback.onCompletion(null, null);
- return null;
- }
- });
- }
-
- // Hook to allow subclasses to persist data
- protected void save() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
deleted file mode 100644
index 239d9a8..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.copycat.util.Callback;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-/**
- * <p>
- * OffsetBackingStore is an interface for storage backends that store key-value data. The backing
- * store doesn't need to handle serialization or deserialization. It only needs to support
- * reading/writing bytes. Since it is expected these operations will require network
- * operations, only bulk operations are supported.
- * </p>
- * <p>
- * Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances
- * that are associated with individual tasks, the caller must be sure keys include information about the
- * connector so that the shared namespace does not result in conflicting keys.
- * </p>
- */
-public interface OffsetBackingStore extends Configurable {
-
- /**
- * Start this offset store.
- */
- public void start();
-
- /**
- * Stop the backing store. Implementations should attempt to shutdown gracefully, but not block
- * indefinitely.
- */
- public void stop();
-
- /**
- * Get the values for the specified keys
- * @param keys list of keys to look up
- * @param callback callback to invoke on completion
- * @return future for the resulting map from key to value
- */
- public Future<Map<ByteBuffer, ByteBuffer>> get(
- Collection<ByteBuffer> keys,
- Callback<Map<ByteBuffer, ByteBuffer>> callback);
-
- /**
- * Set the specified keys and values.
- * @param values map from key to value
- * @param callback callback to invoke on completion
- * @return void future for the operation
- */
- public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
- Callback<Void> callback);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
deleted file mode 100644
index 84229a5..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented
- * directly, the interface is only separate from this implementation because it needs to be
- * included in the public API package.
- */
-public class OffsetStorageReaderImpl implements OffsetStorageReader {
- private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
-
- private final OffsetBackingStore backingStore;
- private final String namespace;
- private final Converter keyConverter;
- private final Converter valueConverter;
-
- public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
- Converter keyConverter, Converter valueConverter) {
- this.backingStore = backingStore;
- this.namespace = namespace;
- this.keyConverter = keyConverter;
- this.valueConverter = valueConverter;
- }
-
- @Override
- public <T> Map<String, Object> offset(Map<String, T> partition) {
- return offsets(Arrays.asList(partition)).get(partition);
- }
-
- @Override
- public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
- // Serialize keys so backing store can work with them
- Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size());
- for (Map<String, T> key : partitions) {
- try {
- // Offsets are treated as schemaless, their format is only validated here (and the returned value below)
- OffsetUtils.validateFormat(key);
- byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, Arrays.asList(namespace, key));
- ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
- serializedToOriginal.put(keyBuffer, key);
- } catch (Throwable t) {
- log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with "
- + "namespace {}. No value for this data will be returned, which may break the "
- + "task or cause it to skip some data.", namespace, t);
- }
- }
-
- // Get serialized key -> serialized value from backing store
- Map<ByteBuffer, ByteBuffer> raw;
- try {
- raw = backingStore.get(serializedToOriginal.keySet(), null).get();
- } catch (Exception e) {
- log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
- throw new CopycatException("Failed to fetch offsets.", e);
- }
-
- // Deserialize all the values and map back to the original keys
- Map<Map<String, T>, Map<String, Object>> result = new HashMap<>(partitions.size());
- for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
- try {
- // Since null could be a valid key, explicitly check whether map contains the key
- if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
- log.error("Should be able to map {} back to a requested partition-offset key, backing "
- + "store may have returned invalid data", rawEntry.getKey());
- continue;
- }
- Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
- SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue() != null ? rawEntry.getValue().array() : null);
- Object deserializedValue = deserializedSchemaAndValue.value();
- OffsetUtils.validateFormat(deserializedValue);
-
- result.put(origKey, (Map<String, Object>) deserializedValue);
- } catch (Throwable t) {
- log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with"
- + " namespace {}. No value for this data will be returned, which may break the "
- + "task or cause it to skip some data. This could either be due to an error in "
- + "the connector implementation or incompatible schema.", namespace, t);
- }
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
deleted file mode 100644
index 59c12a7..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Future;
-
-/**
- * <p>
- * OffsetStorageWriter is a buffered writer that wraps the simple OffsetBackingStore interface.
- * It maintains a copy of the key-value data in memory and buffers writes. It allows you to take
- * a snapshot, which can then be asynchronously flushed to the backing store while new writes
- * continue to be processed. This allows Copycat to process offset commits in the background
- * while continuing to process messages.
- * </p>
- * <p>
- * Copycat uses an OffsetStorage implementation to save state about the current progress of
- * source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as
- * simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs
- * because they can use Kafka's native offset storage (or the sink data store can handle offset
- * storage to achieve exactly once semantics).
- * </p>
- * <p>
- * Both partitions and offsets are generic data objects. This allows different connectors to use
- * whatever representation they need, even arbitrarily complex records. These are translated
- * internally into the serialized form the OffsetBackingStore uses.
- * </p>
- * <p>
- * Note that this only provides write functionality. This is intentional to ensure stale data is
- * never read. Offset data should only be read during startup or reconfiguration of a task. By
- * always serving those requests by reading the values from the backing store, we ensure we never
- * accidentally use stale data. (One example of how this can occur: a task is processing input
- * partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere;
- * reconfiguration causes partition A to be reassigned to this node, but now the offset data is out
- * of date). Since these offsets are created and managed by the connector itself, there's no way
- * for the offset management layer to know which keys are "owned" by which tasks at any given
- * time.
- * </p>
- * <p>
- * This class is not thread-safe. It should only be accessed from a Task's processing thread.
- * </p>
- */
-public class OffsetStorageWriter {
- private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
-
- private final OffsetBackingStore backingStore;
- private final Converter keyConverter;
- private final Converter valueConverter;
- private final String namespace;
- // Offset data in Copycat format
- private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
-
- // Not synchronized, should only be accessed by flush thread
- private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
- // Unique ID for each flush request to handle callbacks after timeouts
- private long currentFlushId = 0;
-
- public OffsetStorageWriter(OffsetBackingStore backingStore,
- String namespace, Converter keyConverter, Converter valueConverter) {
- this.backingStore = backingStore;
- this.namespace = namespace;
- this.keyConverter = keyConverter;
- this.valueConverter = valueConverter;
- }
-
- /**
- * Set an offset for a partition using Copycat data values
- * @param partition the partition to store an offset for
- * @param offset the offset
- */
- public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) {
- data.put((Map<String, Object>) partition, (Map<String, Object>) offset);
- }
-
- private boolean flushing() {
- return toFlush != null;
- }
-
- /**
- * Performs the first step of a flush operation, snapshotting the current state. This does not
- * actually initiate the flush with the underlying storage.
- *
- * @return true if a flush was initiated, false if no data was available
- */
- public synchronized boolean beginFlush() {
- if (flushing()) {
- log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the "
- + "framework should not allow this");
- throw new CopycatException("OffsetStorageWriter is already flushing");
- }
-
- if (data.isEmpty())
- return false;
-
- assert !flushing();
- toFlush = data;
- data = new HashMap<>();
- return true;
- }
-
- /**
- * Flush the current offsets and clear them from this writer. This is non-blocking: it
- * moves the current set of offsets out of the way, serializes the data, and asynchronously
- * writes the data to the backing store. If no offsets need to be written, the callback is
- * still invoked, but no Future is returned.
- *
- * @return a Future, or null if there are no offsets to commitOffsets
- */
- public Future<Void> doFlush(final Callback<Void> callback) {
- final long flushId = currentFlushId;
-
- // Serialize
- Map<ByteBuffer, ByteBuffer> offsetsSerialized;
- try {
- offsetsSerialized = new HashMap<>();
- for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
- // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
- // for that data. The only enforcement of the format is here.
- OffsetUtils.validateFormat(entry.getKey());
- OffsetUtils.validateFormat(entry.getValue());
- // When serializing the key, we add in the namespace information so the key is [namespace, real key]
- byte[] key = keyConverter.fromCopycatData(namespace, null, Arrays.asList(namespace, entry.getKey()));
- ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
- byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue());
- ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
- offsetsSerialized.put(keyBuffer, valueBuffer);
- }
- } catch (Throwable t) {
- // Must handle errors properly here or the writer will be left mid-flush forever and be
- // unable to make progress.
- log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
- + "offsets under namespace {}. This likely won't recover unless the "
- + "unserializable partition or offset information is overwritten.", namespace);
- log.error("Cause of serialization failure:", t);
- callback.onCompletion(t, null);
- return null;
- }
-
- // And submit the data
- log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
- return backingStore.set(offsetsSerialized, new Callback<Void>() {
- @Override
- public void onCompletion(Throwable error, Void result) {
- boolean isCurrent = handleFinishWrite(flushId, error, result);
- if (isCurrent && callback != null)
- callback.onCompletion(error, result);
- }
- });
- }
-
- /**
- * Cancel a flush that has been initiated by {@link #beginFlush}. This should not be called if
- * {@link #doFlush} has already been invoked. It should be used if an operation performed
- * between beginFlush and doFlush failed.
- */
- public synchronized void cancelFlush() {
- // Verify we're still flushing data to handle a race between cancelFlush() calls from up the
- // call stack and callbacks from the write request to underlying storage
- if (flushing()) {
- // Just recombine the data and place it back in the primary storage
- toFlush.putAll(data);
- data = toFlush;
- currentFlushId++;
- toFlush = null;
- }
- }
-
- /**
- * Handle completion of a write. Returns true if this callback is for the current flush
- * operation, false if it's for an old operation that should now be ignored.
- */
- private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
- // Callbacks need to be handled carefully since the flush operation may have already timed
- // out and been cancelled.
- if (flushId != currentFlushId)
- return false;
-
- if (error != null) {
- cancelFlush();
- } else {
- currentFlushId++;
- toFlush = null;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
deleted file mode 100644
index 9ba7662..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.data.CopycatSchema;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.Map;
-
-public class OffsetUtils {
- public static void validateFormat(Object offsetData) {
- if (offsetData == null)
- return;
-
- if (!(offsetData instanceof Map))
- throw new DataException("Offsets must be specified as a Map");
- validateFormat((Map<Object, Object>) offsetData);
- }
-
- public static <K, V> void validateFormat(Map<K, V> offsetData) {
- // Both keys and values for offsets may be null. For values, this is a useful way to delete offsets or indicate
- // that there's not usable concept of offsets in your source system.
- if (offsetData == null)
- return;
-
- for (Map.Entry<K, V> entry : offsetData.entrySet()) {
- if (!(entry.getKey() instanceof String))
- throw new DataException("Offsets may only use String keys");
-
- Object value = entry.getValue();
- if (value == null)
- continue;
- Schema.Type schemaType = CopycatSchema.schemaType(value.getClass());
- if (!schemaType.isPrimitive())
- throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
deleted file mode 100644
index 5cf1423..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-/**
- * Generic interface for callbacks
- */
-public interface Callback<V> {
- /**
- * Invoked upon completion of the operation.
- *
- * @param error the error that caused the operation to fail, or null if no error occurred
- * @param result the return value, or null if the operation failed
- */
- void onCompletion(Throwable error, V result);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
deleted file mode 100644
index d4cf824..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.Serializable;
-
-/**
- * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within
- * the connector.
- */
-public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> {
- private final String connector;
- private final int task;
-
- @JsonCreator
- public ConnectorTaskId(@JsonProperty("connector") String connector, @JsonProperty("task") int task) {
- this.connector = connector;
- this.task = task;
- }
-
- @JsonProperty
- public String connector() {
- return connector;
- }
-
- @JsonProperty
- public int task() {
- return task;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- ConnectorTaskId that = (ConnectorTaskId) o;
-
- if (task != that.task)
- return false;
- if (connector != null ? !connector.equals(that.connector) : that.connector != null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = connector != null ? connector.hashCode() : 0;
- result = 31 * result + task;
- return result;
- }
-
- @Override
- public String toString() {
- return connector + '-' + task;
- }
-
- @Override
- public int compareTo(ConnectorTaskId o) {
- int connectorCmp = connector.compareTo(o.connector);
- if (connectorCmp != 0)
- return connectorCmp;
- return ((Integer) task).compareTo(o.task);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
deleted file mode 100644
index 862adf9..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> {
-
- private Callback<T> underlying;
- private CountDownLatch finishedLatch;
- private T result = null;
- private Throwable exception = null;
-
- public ConvertingFutureCallback(Callback<T> underlying) {
- this.underlying = underlying;
- this.finishedLatch = new CountDownLatch(1);
- }
-
- public abstract T convert(U result);
-
- @Override
- public void onCompletion(Throwable error, U result) {
- this.exception = error;
- this.result = convert(result);
- if (underlying != null)
- underlying.onCompletion(error, this.result);
- finishedLatch.countDown();
- }
-
- @Override
- public boolean cancel(boolean b) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return finishedLatch.getCount() == 0;
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- finishedLatch.await();
- return result();
- }
-
- @Override
- public T get(long l, TimeUnit timeUnit)
- throws InterruptedException, ExecutionException, TimeoutException {
- if (!finishedLatch.await(l, timeUnit))
- throw new TimeoutException("Timed out waiting for future");
- return result();
- }
-
- private T result() throws ExecutionException {
- if (exception != null) {
- throw new ExecutionException(exception);
- }
- return result;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
deleted file mode 100644
index 269482c..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-public class FutureCallback<T> extends ConvertingFutureCallback<T, T> {
-
- public FutureCallback(Callback<T> underlying) {
- super(underlying);
- }
-
- public FutureCallback() {
- super(null);
- }
-
- @Override
- public T convert(T result) {
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
deleted file mode 100644
index f5e72d3..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-/**
- * <p>
- * KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all
- * clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
- * </p>
- * <p>
- * This functionality is useful for storing different types of data that all clients may need to agree on --
- * offsets or config for example. This class runs a consumer in a background thread to continuously tail the target
- * topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful
- * utilities like checking the current log end offset and waiting until the current end of the log is reached.
- * </p>
- * <p>
- * To support different use cases, this class works with either single- or multi-partition topics.
- * </p>
- * <p>
- * Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
- * record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
- * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
- * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
- * </p>
- */
-public class KafkaBasedLog<K, V> {
- private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
- private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
-
- private Time time;
- private final String topic;
- private final Map<String, Object> producerConfigs;
- private final Map<String, Object> consumerConfigs;
- private final Callback<ConsumerRecord<K, V>> consumedCallback;
- private Consumer<K, V> consumer;
- private Producer<K, V> producer;
-
- private Thread thread;
- private boolean stopRequested;
- private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-
- /**
- * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
- * {@link #start()} is invoked.
- *
- * @param topic the topic to treat as a log
- * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
- * contain compatible serializer settings for the generic types used on this class. Some
- * setting, such as the number of acks, will be overridden to ensure correct behavior of this
- * class.
- * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
- * contain compatible serializer settings for the generic types used on this class. Some
- * setting, such as the auto offset reset policy, will be overridden to ensure correct
- * behavior of this class.
- * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
- * @param time Time interface
- */
- public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
- Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
- this.topic = topic;
- this.producerConfigs = producerConfigs;
- this.consumerConfigs = consumerConfigs;
- this.consumedCallback = consumedCallback;
- this.stopRequested = false;
- this.readLogEndOffsetCallbacks = new ArrayDeque<>();
- this.time = time;
- }
-
- public void start() {
- log.info("Starting KafkaBasedLog with topic " + topic);
-
- producer = createProducer();
- consumer = createConsumer();
-
- List<TopicPartition> partitions = new ArrayList<>();
-
- // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
- // we rely on topic auto-creation
- List<PartitionInfo> partitionInfos = null;
- long started = time.milliseconds();
- while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
- partitionInfos = consumer.partitionsFor(topic);
- Utils.sleep(Math.min(time.milliseconds() - started, 1000));
- }
- if (partitionInfos == null)
- throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
- " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
- " this is your first use of the topic it may have taken too long to create.");
-
- for (PartitionInfo partition : partitionInfos)
- partitions.add(new TopicPartition(partition.topic(), partition.partition()));
- consumer.assign(partitions);
-
- readToLogEnd();
-
- thread = new WorkThread();
- thread.start();
-
- log.info("Finished reading KafakBasedLog for topic " + topic);
-
- log.info("Started KafakBasedLog for topic " + topic);
- }
-
- public void stop() {
- log.info("Stopping KafkaBasedLog for topic " + topic);
-
- synchronized (this) {
- stopRequested = true;
- }
- consumer.wakeup();
-
- try {
- thread.join();
- } catch (InterruptedException e) {
- throw new CopycatException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " +
- "down it's producer and consumer.", e);
- }
-
- try {
- producer.close();
- } catch (KafkaException e) {
- log.error("Failed to stop KafkaBasedLog producer", e);
- }
-
- try {
- consumer.close();
- } catch (KafkaException e) {
- log.error("Failed to stop KafkaBasedLog consumer", e);
- }
-
- log.info("Stopped KafkaBasedLog for topic " + topic);
- }
-
- /**
- * Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.
- * Note that this checks the current, offsets, reads to them, and invokes the callback regardless of whether
- * additional records have been written to the log. If the caller needs to ensure they have truly reached the end
- * of the log, they must ensure there are no other writers during this period.
- *
- * This waits until the end of all partitions has been reached.
- *
- * This method is asynchronous. If you need a synchronous version, pass an instance of
- * {@link org.apache.kafka.copycat.util.FutureCallback} as the {@param callback} parameter and wait on it to block.
- *
- * @param callback the callback to invoke once the end of the log has been reached.
- */
- public void readToEnd(Callback<Void> callback) {
- producer.flush();
- synchronized (this) {
- readLogEndOffsetCallbacks.add(callback);
- }
- consumer.wakeup();
- }
-
- /**
- * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
- * @return the future associated with the operation
- */
- public Future<Void> readToEnd() {
- FutureCallback<Void> future = new FutureCallback<>(null);
- readToEnd(future);
- return future;
- }
-
- public void send(K key, V value) {
- send(key, value, null);
- }
-
- public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
- producer.send(new ProducerRecord<>(topic, key, value), callback);
- }
-
-
- private Producer<K, V> createProducer() {
- // Always require producer acks to all to ensure durable writes
- producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
- return new KafkaProducer<>(producerConfigs);
- }
-
- private Consumer<K, V> createConsumer() {
- // Always force reset to the beginning of the log since this class wants to consume all available log data
- consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- return new KafkaConsumer<>(consumerConfigs);
- }
-
- private void poll(long timeoutMs) {
- try {
- ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
- for (ConsumerRecord<K, V> record : records)
- consumedCallback.onCompletion(null, record);
- } catch (WakeupException e) {
- // Expected on get() or stop(). The calling code should handle this
- throw e;
- } catch (KafkaException e) {
- log.error("Error polling: " + e);
- }
- }
-
- private void readToLogEnd() {
- log.trace("Reading to end of offset log");
-
- Set<TopicPartition> assignment = consumer.assignment();
-
- // This approach to getting the current end offset is hacky until we have an API for looking these up directly
- Map<TopicPartition, Long> offsets = new HashMap<>();
- for (TopicPartition tp : assignment) {
- long offset = consumer.position(tp);
- offsets.put(tp, offset);
- consumer.seekToEnd(tp);
- }
-
- Map<TopicPartition, Long> endOffsets = new HashMap<>();
- try {
- poll(0);
- } finally {
- // If there is an exception, even a possibly expected one like WakeupException, we need to make sure
- // the consumers position is reset or it'll get into an inconsistent state.
- for (TopicPartition tp : assignment) {
- long startOffset = offsets.get(tp);
- long endOffset = consumer.position(tp);
- if (endOffset > startOffset) {
- endOffsets.put(tp, endOffset);
- consumer.seek(tp, startOffset);
- }
- log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
- }
- }
-
- while (!endOffsets.isEmpty()) {
- poll(Integer.MAX_VALUE);
-
- Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
- it.remove();
- else
- break;
- }
- }
- }
-
-
- private class WorkThread extends Thread {
- @Override
- public void run() {
- try {
- while (true) {
- int numCallbacks;
- synchronized (KafkaBasedLog.this) {
- if (stopRequested)
- break;
- numCallbacks = readLogEndOffsetCallbacks.size();
- }
-
- if (numCallbacks > 0) {
- try {
- readToLogEnd();
- } catch (WakeupException e) {
- // Either received another get() call and need to retry reading to end of log or stop() was
- // called. Both are handled by restarting this loop.
- continue;
- }
- }
-
- synchronized (KafkaBasedLog.this) {
- // Only invoke exactly the number of callbacks we found before triggering the read to log end
- // since it is possible for another write + readToEnd to sneak in in the meantime
- for (int i = 0; i < numCallbacks; i++) {
- Callback<Void> cb = readLogEndOffsetCallbacks.poll();
- cb.onCompletion(null, null);
- }
- }
-
- try {
- poll(Integer.MAX_VALUE);
- } catch (WakeupException e) {
- // See previous comment, both possible causes of this wakeup are handled by starting this loop again
- continue;
- }
- }
- } catch (Throwable t) {
- log.error("Unexpected exception in KafkaBasedLog's work thread", t);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
deleted file mode 100644
index 3e23f29..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * <p>
- * Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown,
- * a flag is set, which the thread should detect and try to exit gracefully from. In forcible
- * shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit
- * gracefully, but then force it to exit if it takes too long.
- * </p>
- * <p>
- * Implementations should override the {@link #execute} method and check {@link #getRunning} to
- * determine whether they should try to gracefully exit.
- * </p>
- */
-public abstract class ShutdownableThread extends Thread {
- private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class);
-
- private AtomicBoolean isRunning = new AtomicBoolean(true);
- private CountDownLatch shutdownLatch = new CountDownLatch(1);
-
- /**
- * An UncaughtExceptionHandler to register on every instance of this class. This is useful for
- * testing, where AssertionExceptions in the thread may not cause the test to fail. Since one
- * instance is used for all threads, it must be thread-safe.
- */
- volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null;
-
- public ShutdownableThread(String name) {
- // The default is daemon=true so that these threads will not prevent shutdown. We use this
- // default because threads that are running user code that may not clean up properly, even
- // when we attempt to forcibly shut them down.
- this(name, true);
- }
-
- public ShutdownableThread(String name, boolean daemon) {
- super(name);
- this.setDaemon(daemon);
- if (funcaughtExceptionHandler != null)
- this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
- }
-
- /**
- * Implementations should override this method with the main body for the thread.
- */
- public abstract void execute();
-
- /**
- * Returns true if the thread hasn't exited yet and none of the shutdown methods have been
- * invoked
- */
- public boolean getRunning() {
- return isRunning.get();
- }
-
- @Override
- public void run() {
- try {
- execute();
- } catch (Error | RuntimeException e) {
- log.error("Thread {} exiting with uncaught exception: ", getName(), e);
- throw e;
- } finally {
- shutdownLatch.countDown();
- }
- }
-
- /**
- * Shutdown the thread, first trying to shut down gracefully using the specified timeout, then
- * forcibly interrupting the thread.
- * @param gracefulTimeout the maximum time to wait for a graceful exit
- * @param unit the time unit of the timeout argument
- */
- public void shutdown(long gracefulTimeout, TimeUnit unit)
- throws InterruptedException {
- boolean success = gracefulShutdown(gracefulTimeout, unit);
- if (!success)
- forceShutdown();
- }
-
- /**
- * Attempt graceful shutdown
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- * @return true if successful, false if the timeout elapsed
- */
- public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException {
- startGracefulShutdown();
- return awaitShutdown(timeout, unit);
- }
-
- /**
- * Start shutting down this thread gracefully, but do not block waiting for it to exit.
- */
- public void startGracefulShutdown() {
- log.info("Starting graceful shutdown of thread {}", getName());
- isRunning.set(false);
- }
-
- /**
- * Awaits shutdown of this thread, waiting up to the timeout.
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- * @return true if successful, false if the timeout elapsed
- * @throws InterruptedException
- */
- public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException {
- return shutdownLatch.await(timeout, unit);
- }
-
- /**
- * Immediately tries to force the thread to shut down by interrupting it. This does not try to
- * wait for the thread to truly exit because forcible shutdown is not always possible. By
- * default, threads are marked as daemon threads so they will not prevent the process from
- * exiting.
- */
- public void forceShutdown() throws InterruptedException {
- log.info("Forcing shutdown of thread {}", getName());
- isRunning.set(false);
- interrupt();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
deleted file mode 100644
index 0458054..0000000
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.RetriableException;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkRecord;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.storage.Converter;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.MockTime;
-import org.easymock.Capture;
-import org.easymock.CaptureType;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(WorkerSinkTask.class)
-@PowerMockIgnore("javax.management.*")
-public class WorkerSinkTaskTest {
- // These are fixed to keep this code simpler. In this example we assume byte[] raw values
- // with mix of integer/string in Copycat
- private static final String TOPIC = "test";
- private static final int PARTITION = 12;
- private static final int PARTITION2 = 13;
- private static final long FIRST_OFFSET = 45;
- private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
- private static final int KEY = 12;
- private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
- private static final String VALUE = "VALUE";
- private static final byte[] RAW_KEY = "key".getBytes();
- private static final byte[] RAW_VALUE = "value".getBytes();
-
- private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
- private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
-
- private static final Map<String, String> TASK_PROPS = new HashMap<>();
- static {
- TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
- }
-
-
- private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
- private Time time;
- private WorkerSinkTask workerTask;
- @Mock
- private SinkTask sinkTask;
- private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
- private WorkerConfig workerConfig;
- @Mock
- private Converter keyConverter;
- @Mock
- private Converter valueConverter;
- @Mock
- private WorkerSinkTaskThread workerThread;
- @Mock
- private KafkaConsumer<byte[], byte[]> consumer;
- private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
-
- private long recordsReturned;
-
- @Before
- public void setUp() {
- time = new MockTime();
- Map<String, String> workerProps = new HashMap<>();
- workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.put("internal.key.converter.schemas.enable", "false");
- workerProps.put("internal.value.converter.schemas.enable", "false");
- workerConfig = new StandaloneConfig(workerProps);
- workerTask = PowerMock.createPartialMock(
- WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
- taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
-
- recordsReturned = 0;
- }
-
- @Test
- public void testPollRedelivery() throws Exception {
- expectInitializeTask();
-
- // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
- expectConsumerPoll(1);
- expectConvertMessages(1);
- Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
- sinkTask.put(EasyMock.capture(records));
- EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
- // Pause
- EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
- consumer.pause(TOPIC_PARTITION);
- PowerMock.expectLastCall();
- consumer.pause(TOPIC_PARTITION2);
- PowerMock.expectLastCall();
-
- // Retry delivery should suceed
- expectConsumerPoll(0);
- sinkTask.put(EasyMock.capture(records));
- EasyMock.expectLastCall();
- // And unpause
- EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
- consumer.resume(TOPIC_PARTITION);
- PowerMock.expectLastCall();
- consumer.resume(TOPIC_PARTITION2);
- PowerMock.expectLastCall();
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- workerTask.poll(Long.MAX_VALUE);
- workerTask.poll(Long.MAX_VALUE);
-
- PowerMock.verifyAll();
- }
-
-
- private void expectInitializeTask() throws Exception {
- PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
- PowerMock.expectPrivate(workerTask, "createWorkerThread")
- .andReturn(workerThread);
- workerThread.start();
- PowerMock.expectLastCall();
-
- consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
- PowerMock.expectLastCall();
-
- EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
- @Override
- public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
- rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
- return ConsumerRecords.empty();
- }
- });
- EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
- EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-
- sinkTask.initialize(EasyMock.capture(sinkTaskContext));
- PowerMock.expectLastCall();
- sinkTask.start(TASK_PROPS);
- PowerMock.expectLastCall();
- }
-
- private void expectConsumerPoll(final int numMessages) {
- EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
- new IAnswer<ConsumerRecords<byte[], byte[]>>() {
- @Override
- public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
- List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- for (int i = 0; i < numMessages; i++)
- records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE));
- recordsReturned += numMessages;
- return new ConsumerRecords<>(
- numMessages > 0 ?
- Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) :
- Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap()
- );
- }
- });
- }
-
- private void expectConvertMessages(final int numMessages) {
- EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
- EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
- }
-}