You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:57 UTC
[07/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
deleted file mode 100644
index cf39606..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for all fetchers, which implement the connections to Kafka brokers and
- * pull records from Kafka partitions.
- *
- * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
- * as well as around the optional timestamp assignment and watermark generation.
- *
- * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
- * the Flink data streams.
- * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
- */
-public abstract class AbstractFetcher<T, KPH> {
-
- protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
- protected static final int PERIODIC_WATERMARKS = 1;
- protected static final int PUNCTUATED_WATERMARKS = 2;
-
- // ------------------------------------------------------------------------
-
- /** The source context to emit records and watermarks to */
- protected final SourceContext<T> sourceContext;
-
- /** The lock that guarantees that record emission and state updates are atomic,
- * from the view of taking a checkpoint */
- protected final Object checkpointLock;
-
- /** All partitions (and their state) that this fetcher is subscribed to */
- private final KafkaTopicPartitionState<KPH>[] allPartitions;
-
- /** The mode describing whether the fetcher also generates timestamps and watermarks */
- protected final int timestampWatermarkMode;
-
- /** Flag whether to register metrics for the fetcher */
- protected final boolean useMetrics;
-
- /** Only relevant for punctuated watermarks: The current cross partition watermark */
- private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
-
- // ------------------------------------------------------------------------
-
- protected AbstractFetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- ProcessingTimeService processingTimeProvider,
- long autoWatermarkInterval,
- ClassLoader userCodeClassLoader,
- boolean useMetrics) throws Exception
- {
- this.sourceContext = checkNotNull(sourceContext);
- this.checkpointLock = sourceContext.getCheckpointLock();
- this.useMetrics = useMetrics;
-
- // figure out what we watermark mode we will be using
-
- if (watermarksPeriodic == null) {
- if (watermarksPunctuated == null) {
- // simple case, no watermarks involved
- timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
- } else {
- timestampWatermarkMode = PUNCTUATED_WATERMARKS;
- }
- } else {
- if (watermarksPunctuated == null) {
- timestampWatermarkMode = PERIODIC_WATERMARKS;
- } else {
- throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
- }
- }
-
- // create our partition state according to the timestamp/watermark mode
- this.allPartitions = initializePartitions(
- assignedPartitions,
- timestampWatermarkMode,
- watermarksPeriodic, watermarksPunctuated,
- userCodeClassLoader);
-
- // if we have periodic watermarks, kick off the interval scheduler
- if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
- (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
-
- PeriodicWatermarkEmitter periodicEmitter =
- new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
- periodicEmitter.start();
- }
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets all partitions (with partition state) that this fetcher is subscribed to.
- *
- * @return All subscribed partitions.
- */
- protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
- return allPartitions;
- }
-
- // ------------------------------------------------------------------------
- // Core fetcher work methods
- // ------------------------------------------------------------------------
-
- public abstract void runFetchLoop() throws Exception;
-
- public abstract void cancel();
-
- // ------------------------------------------------------------------------
- // Kafka version specifics
- // ------------------------------------------------------------------------
-
- /**
- * Creates the Kafka version specific representation of the given
- * topic partition.
- *
- * @param partition The Flink representation of the Kafka topic partition.
- * @return The specific Kafka representation of the Kafka topic partition.
- */
- public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
-
- /**
- * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
- * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
- * the last processed record of each partition. Version-specific implementations of this method
- * need to hold the contract that the given offsets must be incremented by 1 before
- * committing them, so that committed offsets to Kafka represent "the next record to process".
- *
- * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
- * @throws Exception This method forwards exceptions.
- */
- public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
-
- // ------------------------------------------------------------------------
- // snapshot and restore the state
- // ------------------------------------------------------------------------
-
- /**
- * Takes a snapshot of the partition offsets.
- *
- * <p>Important: This method mus be called under the checkpoint lock.
- *
- * @return A map from partition to current offset.
- */
- public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
- // this method assumes that the checkpoint lock is held
- assert Thread.holdsLock(checkpointLock);
-
- HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
- for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
- state.put(partition.getKafkaTopicPartition(), partition.getOffset());
- }
- return state;
- }
-
- /**
- * Restores the partition offsets.
- *
- * @param snapshotState The offsets for the partitions
- */
- public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
- for (KafkaTopicPartitionState<?> partition : allPartitions) {
- Long offset = snapshotState.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- partition.setOffset(offset);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // emitting records
- // ------------------------------------------------------------------------
-
- /**
- * Emits a record without attaching an existing timestamp to it.
- *
- * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
- * That makes the fast path efficient, the extended paths are called as separate methods.
- *
- * @param record The record to emit
- * @param partitionState The state of the Kafka partition from which the record was fetched
- * @param offset The offset of the record
- */
- protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
- if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
- // fast path logic, in case there are no watermarks
-
- // emit the record, using the checkpoint lock to guarantee
- // atomicity of record emission and offset state update
- synchronized (checkpointLock) {
- sourceContext.collect(record);
- partitionState.setOffset(offset);
- }
- }
- else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
- }
- else {
- emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
- }
- }
-
- /**
- * Emits a record attaching a timestamp to it.
- *
- * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
- * That makes the fast path efficient, the extended paths are called as separate methods.
- *
- * @param record The record to emit
- * @param partitionState The state of the Kafka partition from which the record was fetched
- * @param offset The offset of the record
- */
- protected void emitRecordWithTimestamp(
- T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
-
- if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
- // fast path logic, in case there are no watermarks generated in the fetcher
-
- // emit the record, using the checkpoint lock to guarantee
- // atomicity of record emission and offset state update
- synchronized (checkpointLock) {
- sourceContext.collectWithTimestamp(record, timestamp);
- partitionState.setOffset(offset);
- }
- }
- else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
- }
- else {
- emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
- }
- }
-
- /**
- * Record emission, if a timestamp will be attached from an assigner that is
- * also a periodic watermark generator.
- */
- protected void emitRecordWithTimestampAndPeriodicWatermark(
- T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
- {
- @SuppressWarnings("unchecked")
- final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
- (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
- // extract timestamp - this accesses/modifies the per-partition state inside the
- // watermark generator instance, so we need to lock the access on the
- // partition state. concurrent access can happen from the periodic emitter
- final long timestamp;
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (withWatermarksState) {
- timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
- }
-
- // emit the record with timestamp, using the usual checkpoint lock to guarantee
- // atomicity of record emission and offset state update
- synchronized (checkpointLock) {
- sourceContext.collectWithTimestamp(record, timestamp);
- partitionState.setOffset(offset);
- }
- }
-
- /**
- * Record emission, if a timestamp will be attached from an assigner that is
- * also a punctuated watermark generator.
- */
- protected void emitRecordWithTimestampAndPunctuatedWatermark(
- T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
- {
- @SuppressWarnings("unchecked")
- final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
- (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
- // only one thread ever works on accessing timestamps and watermarks
- // from the punctuated extractor
- final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
- final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
- // emit the record with timestamp, using the usual checkpoint lock to guarantee
- // atomicity of record emission and offset state update
- synchronized (checkpointLock) {
- sourceContext.collectWithTimestamp(record, timestamp);
- partitionState.setOffset(offset);
- }
-
- // if we also have a new per-partition watermark, check if that is also a
- // new cross-partition watermark
- if (newWatermark != null) {
- updateMinPunctuatedWatermark(newWatermark);
- }
- }
-
- /**
- *Checks whether a new per-partition watermark is also a new cross-partition watermark.
- */
- private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
- if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
- long newMin = Long.MAX_VALUE;
-
- for (KafkaTopicPartitionState<?> state : allPartitions) {
- @SuppressWarnings("unchecked")
- final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
- (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
-
- newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
- }
-
- // double-check locking pattern
- if (newMin > maxWatermarkSoFar) {
- synchronized (checkpointLock) {
- if (newMin > maxWatermarkSoFar) {
- maxWatermarkSoFar = newMin;
- sourceContext.emitWatermark(new Watermark(newMin));
- }
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Utility method that takes the topic partitions and creates the topic partition state
- * holders. If a watermark generator per partition exists, this will also initialize those.
- */
- private KafkaTopicPartitionState<KPH>[] initializePartitions(
- List<KafkaTopicPartition> assignedPartitions,
- int timestampWatermarkMode,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- ClassLoader userCodeClassLoader)
- throws IOException, ClassNotFoundException
- {
- switch (timestampWatermarkMode) {
-
- case NO_TIMESTAMPS_WATERMARKS: {
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionState<KPH>[] partitions =
- (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
- int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
- // create the kafka version specific partition handle
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
- partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
- }
-
- return partitions;
- }
-
- case PERIODIC_WATERMARKS: {
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
- (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
- new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
-
- int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
-
- AssignerWithPeriodicWatermarks<T> assignerInstance =
- watermarksPeriodic.deserializeValue(userCodeClassLoader);
-
- partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
- partition, kafkaHandle, assignerInstance);
- }
-
- return partitions;
- }
-
- case PUNCTUATED_WATERMARKS: {
- @SuppressWarnings("unchecked")
- KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
- (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
- new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
-
- int pos = 0;
- for (KafkaTopicPartition partition : assignedPartitions) {
- KPH kafkaHandle = createKafkaPartitionHandle(partition);
-
- AssignerWithPunctuatedWatermarks<T> assignerInstance =
- watermarksPunctuated.deserializeValue(userCodeClassLoader);
-
- partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
- partition, kafkaHandle, assignerInstance);
- }
-
- return partitions;
- }
- default:
- // cannot happen, add this as a guard for the future
- throw new RuntimeException();
- }
- }
-
- // ------------------------- Metrics ----------------------------------
-
- /**
- * Add current and committed offsets to metric group
- *
- * @param metricGroup The metric group to use
- */
- protected void addOffsetStateGauge(MetricGroup metricGroup) {
- // add current offsets to gage
- MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
- MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
- for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
- currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
- committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
- }
- }
-
- /**
- * Gauge types
- */
- private enum OffsetGaugeType {
- CURRENT_OFFSET,
- COMMITTED_OFFSET
- }
-
- /**
- * Gauge for getting the offset of a KafkaTopicPartitionState.
- */
- private static class OffsetGauge implements Gauge<Long> {
-
- private final KafkaTopicPartitionState<?> ktp;
- private final OffsetGaugeType gaugeType;
-
- public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
- this.ktp = ktp;
- this.gaugeType = gaugeType;
- }
-
- @Override
- public Long getValue() {
- switch(gaugeType) {
- case COMMITTED_OFFSET:
- return ktp.getCommittedOffset();
- case CURRENT_OFFSET:
- return ktp.getOffset();
- default:
- throw new RuntimeException("Unknown gauge type: " + gaugeType);
- }
- }
- }
- // ------------------------------------------------------------------------
-
- /**
- * The periodic watermark emitter. In its given interval, it checks all partitions for
- * the current event time watermark, and possibly emits the next watermark.
- */
- private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
-
- private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
-
- private final SourceContext<?> emitter;
-
- private final ProcessingTimeService timerService;
-
- private final long interval;
-
- private long lastWatermarkTimestamp;
-
- //-------------------------------------------------
-
- PeriodicWatermarkEmitter(
- KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
- SourceContext<?> emitter,
- ProcessingTimeService timerService,
- long autoWatermarkInterval)
- {
- this.allPartitions = checkNotNull(allPartitions);
- this.emitter = checkNotNull(emitter);
- this.timerService = checkNotNull(timerService);
- this.interval = autoWatermarkInterval;
- this.lastWatermarkTimestamp = Long.MIN_VALUE;
- }
-
- //-------------------------------------------------
-
- public void start() {
- timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
- }
-
- @Override
- public void onProcessingTime(long timestamp) throws Exception {
-
- long minAcrossAll = Long.MAX_VALUE;
- for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
-
- // we access the current watermark for the periodic assigners under the state
- // lock, to prevent concurrent modification to any internal variables
- final long curr;
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (state) {
- curr = state.getCurrentWatermarkTimestamp();
- }
-
- minAcrossAll = Math.min(minAcrossAll, curr);
- }
-
- // emit next watermark, if there is one
- if (minAcrossAll > lastWatermarkTimestamp) {
- lastWatermarkTimestamp = minAcrossAll;
- emitter.emitWatermark(new Watermark(minAcrossAll));
- }
-
- // schedule the next watermark
- timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
deleted file mode 100644
index c736493..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import javax.annotation.Nullable;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A proxy that communicates exceptions between threads. Typically used if an exception
- * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
- *
- * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
- * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
- * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
- * an exception occurs.
- *
- * <pre>
- * {@code
- *
- * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
- *
- * Thread subThread = new Thread() {
- *
- * public void run() {
- * try {
- * doSomething();
- * } catch (Throwable t) {
- * errorProxy.reportError(
- * } finally {
- * doSomeCleanup();
- * }
- * }
- * };
- * subThread.start();
- *
- * doSomethingElse();
- * errorProxy.checkAndThrowException();
- *
- * doSomethingMore();
- * errorProxy.checkAndThrowException();
- *
- * try {
- * subThread.join();
- * } catch (InterruptedException e) {
- * errorProxy.checkAndThrowException();
- * // restore interrupted status, if not caused by an exception
- * Thread.currentThread().interrupt();
- * }
- * }
- * </pre>
- */
-public class ExceptionProxy {
-
- /** The thread that should be interrupted when an exception occurs */
- private final Thread toInterrupt;
-
- /** The exception to throw */
- private final AtomicReference<Throwable> exception;
-
- /**
- * Creates an exception proxy that interrupts the given thread upon
- * report of an exception. The thread to interrupt may be null.
- *
- * @param toInterrupt The thread to interrupt upon an exception. May be null.
- */
- public ExceptionProxy(@Nullable Thread toInterrupt) {
- this.toInterrupt = toInterrupt;
- this.exception = new AtomicReference<>();
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Sets the exception and interrupts the target thread,
- * if no other exception has occurred so far.
- *
- * <p>The exception is only set (and the interruption is only triggered),
- * if no other exception was set before.
- *
- * @param t The exception that occurred
- */
- public void reportError(Throwable t) {
- // set the exception, if it is the first (and the exception is non null)
- if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) {
- toInterrupt.interrupt();
- }
- }
-
- /**
- * Checks whether an exception has been set via {@link #reportError(Throwable)}.
- * If yes, that exception if re-thrown by this method.
- *
- * @throws Exception This method re-throws the exception, if set.
- */
- public void checkAndThrowException() throws Exception {
- Throwable t = exception.get();
- if (t != null) {
- if (t instanceof Exception) {
- throw (Exception) t;
- }
- else if (t instanceof Error) {
- throw (Error) t;
- }
- else {
- throw new Exception(t);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
deleted file mode 100644
index c68fe28..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink's description of a partition in a Kafka topic.
- * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
- *
- * <p>Note: This class must not change in its structure, because it would change the
- * serialization format and make previous savepoints unreadable.
- */
-public final class KafkaTopicPartition implements Serializable {
-
- /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
- * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
- private static final long serialVersionUID = 722083576322742325L;
-
- // ------------------------------------------------------------------------
-
- private final String topic;
- private final int partition;
- private final int cachedHash;
-
- public KafkaTopicPartition(String topic, int partition) {
- this.topic = requireNonNull(topic);
- this.partition = partition;
- this.cachedHash = 31 * topic.hashCode() + partition;
- }
-
- // ------------------------------------------------------------------------
-
- public String getTopic() {
- return topic;
- }
-
- public int getPartition() {
- return partition;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "KafkaTopicPartition{" +
- "topic='" + topic + '\'' +
- ", partition=" + partition +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- else if (o instanceof KafkaTopicPartition) {
- KafkaTopicPartition that = (KafkaTopicPartition) o;
- return this.partition == that.partition && this.topic.equals(that.topic);
- }
- else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return cachedHash;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- public static String toString(Map<KafkaTopicPartition, Long> map) {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
- KafkaTopicPartition ktp = p.getKey();
- sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
- }
- return sb.toString();
- }
-
- public static String toString(List<KafkaTopicPartition> partitions) {
- StringBuilder sb = new StringBuilder();
- for (KafkaTopicPartition p: partitions) {
- sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
- }
- return sb.toString();
- }
-
-
- public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
- List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
- for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
- ret.add(ktpl.getTopicPartition());
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
deleted file mode 100644
index 1959a05..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.kafka.common.Node;
-
-import java.io.Serializable;
-
-/**
- * Serializable Topic Partition info with leader Node information.
- * This class is used at runtime.
- */
-public class KafkaTopicPartitionLeader implements Serializable {
-
- private static final long serialVersionUID = 9145855900303748582L;
-
- private final int leaderId;
- private final int leaderPort;
- private final String leaderHost;
- private final KafkaTopicPartition topicPartition;
- private final int cachedHash;
-
- public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
- this.topicPartition = topicPartition;
- if (leader == null) {
- this.leaderId = -1;
- this.leaderHost = null;
- this.leaderPort = -1;
- } else {
- this.leaderId = leader.id();
- this.leaderPort = leader.port();
- this.leaderHost = leader.host();
- }
- int cachedHash = (leader == null) ? 14 : leader.hashCode();
- this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
- }
-
- public KafkaTopicPartition getTopicPartition() {
- return topicPartition;
- }
-
- public Node getLeader() {
- if (this.leaderId == -1) {
- return null;
- } else {
- return new Node(leaderId, leaderHost, leaderPort);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaTopicPartitionLeader)) {
- return false;
- }
-
- KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
-
- if (!topicPartition.equals(that.topicPartition)) {
- return false;
- }
- return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
- }
-
- @Override
- public int hashCode() {
- return cachedHash;
- }
-
- @Override
- public String toString() {
- return "KafkaTopicPartitionLeader{" +
- "leaderId=" + leaderId +
- ", leaderPort=" + leaderPort +
- ", leaderHost='" + leaderHost + '\'' +
- ", topic=" + topicPartition.getTopic() +
- ", partition=" + topicPartition.getPartition() +
- '}';
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
deleted file mode 100644
index 7cb5f46..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-/**
- * The state that the Flink Kafka Consumer holds for each Kafka partition.
- * Includes the Kafka descriptor for partitions.
- *
- * <p>This class describes the most basic state (only the offset), subclasses
- * define more elaborate state, containing current watermarks and timestamp
- * extractors.
- *
- * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
- */
-public class KafkaTopicPartitionState<KPH> {
-
- /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
- * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
- public static final long OFFSET_NOT_SET = -915623761776L;
-
- // ------------------------------------------------------------------------
-
- /** The Flink description of a Kafka partition */
- private final KafkaTopicPartition partition;
-
- /** The Kafka description of a Kafka partition (varies across different Kafka versions) */
- private final KPH kafkaPartitionHandle;
-
- /** The offset within the Kafka partition that we already processed */
- private volatile long offset;
-
- /** The offset of the Kafka partition that has been committed */
- private volatile long committedOffset;
-
- // ------------------------------------------------------------------------
-
- public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
- this.partition = partition;
- this.kafkaPartitionHandle = kafkaPartitionHandle;
- this.offset = OFFSET_NOT_SET;
- this.committedOffset = OFFSET_NOT_SET;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets Flink's descriptor for the Kafka Partition.
- * @return The Flink partition descriptor.
- */
- public final KafkaTopicPartition getKafkaTopicPartition() {
- return partition;
- }
-
- /**
- * Gets Kafka's descriptor for the Kafka Partition.
- * @return The Kafka partition descriptor.
- */
- public final KPH getKafkaPartitionHandle() {
- return kafkaPartitionHandle;
- }
-
- public final String getTopic() {
- return partition.getTopic();
- }
-
- public final int getPartition() {
- return partition.getPartition();
- }
-
- /**
- * The current offset in the partition. This refers to the offset last element that
- * we retrieved and emitted successfully. It is the offset that should be stored in
- * a checkpoint.
- */
- public final long getOffset() {
- return offset;
- }
-
- public final void setOffset(long offset) {
- this.offset = offset;
- }
-
- public final boolean isOffsetDefined() {
- return offset != OFFSET_NOT_SET;
- }
-
- public final void setCommittedOffset(long offset) {
- this.committedOffset = offset;
- }
-
- public final long getCommittedOffset() {
- return committedOffset;
- }
-
-
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
- + ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
deleted file mode 100644
index efdc73f..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-/**
- * A special version of the per-kafka-partition-state that additionally holds
- * a periodic watermark generator (and timestamp extractor) per partition.
- *
- * @param <T> The type of records handled by the watermark generator
- * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
- */
-public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-
- /** The timestamp assigner and watermark generator for the partition */
- private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
-
- /** The last watermark timestamp generated by this partition */
- private long partitionWatermark;
-
- // ------------------------------------------------------------------------
-
- public KafkaTopicPartitionStateWithPeriodicWatermarks(
- KafkaTopicPartition partition, KPH kafkaPartitionHandle,
- AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
- {
- super(partition, kafkaPartitionHandle);
-
- this.timestampsAndWatermarks = timestampsAndWatermarks;
- this.partitionWatermark = Long.MIN_VALUE;
- }
-
- // ------------------------------------------------------------------------
-
- public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
- return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
- }
-
- public long getCurrentWatermarkTimestamp() {
- Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
- if (wm != null) {
- partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
- }
- return partitionWatermark;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
- + ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
deleted file mode 100644
index edf40ce..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import javax.annotation.Nullable;
-
-/**
- * A special version of the per-kafka-partition-state that additionally holds
- * a periodic watermark generator (and timestamp extractor) per partition.
- *
- * <p>This class is not thread safe, but it gives volatile access to the current
- * partition watermark ({@link #getCurrentPartitionWatermark()}).
- *
- * @param <T> The type of records handled by the watermark generator
- * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
- */
-public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-
- /** The timestamp assigner and watermark generator for the partition */
- private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
-
- /** The last watermark timestamp generated by this partition */
- private volatile long partitionWatermark;
-
- // ------------------------------------------------------------------------
-
- public KafkaTopicPartitionStateWithPunctuatedWatermarks(
- KafkaTopicPartition partition, KPH kafkaPartitionHandle,
- AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
- {
- super(partition, kafkaPartitionHandle);
-
- this.timestampsAndWatermarks = timestampsAndWatermarks;
- this.partitionWatermark = Long.MIN_VALUE;
- }
-
- // ------------------------------------------------------------------------
-
- public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
- return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
- }
-
- @Nullable
- public Watermark checkAndGetNewWatermark(T record, long timestamp) {
- Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
- if (mark != null && mark.getTimestamp() > partitionWatermark) {
- partitionWatermark = mark.getTimestamp();
- return mark;
- }
- else {
- return null;
- }
- }
-
- public long getCurrentPartitionWatermark() {
- return partitionWatermark;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
- + ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
deleted file mode 100644
index 7a41ade..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class TypeUtil {
- private TypeUtil() {}
-
- /**
- * Creates TypeInformation array for an array of Classes.
- * @param fieldTypes classes to extract type information from
- * @return type information
- */
- public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
- TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
- for (int i = 0; i < fieldTypes.length; i++) {
- typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
- }
- return typeInfos;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
deleted file mode 100644
index cedb696..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-/**
- * Gauge for getting the current value of a Kafka metric.
- */
-public class KafkaMetricWrapper implements Gauge<Double> {
- private final org.apache.kafka.common.Metric kafkaMetric;
-
- public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
- this.kafkaMetric = metric;
- }
-
- @Override
- public Double getValue() {
- return kafkaMetric.value();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index 9b848e0..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * # More Flink partitions than kafka partitions
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- * </pre>
- *
- * Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
- *
- */
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
- private static final long serialVersionUID = 1627268846962918126L;
-
- private int targetPartition = -1;
-
- @Override
- public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
- if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
- throw new IllegalArgumentException();
- }
-
- this.targetPartition = partitions[parallelInstanceId % partitions.length];
- }
-
- @Override
- public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- if (targetPartition >= 0) {
- return targetPartition;
- } else {
- throw new RuntimeException("The partitioner has not been initialized properly");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
deleted file mode 100644
index 37e2ef6..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
- */
-public abstract class KafkaPartitioner<T> implements Serializable {
-
- private static final long serialVersionUID = -1974260817778593473L;
-
- /**
- * Initializer for the Partitioner.
- * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
- * @param parallelInstances the total number of parallel instances
- * @param partitions an array describing the partition IDs of the available Kafka partitions.
- */
- public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
- // overwrite this method if needed.
- }
-
- public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
deleted file mode 100644
index d170058..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.io.IOException;
-
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Fields can be accessed by calling objectNode.get(<name>).as(<type>)
- */
-public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
- private ObjectMapper mapper;
-
- @Override
- public ObjectNode deserialize(byte[] message) throws IOException {
- if (mapper == null) {
- mapper = new ObjectMapper();
- }
- return mapper.readValue(message, ObjectNode.class);
- }
-
- @Override
- public boolean isEndOfStream(ObjectNode nextElement) {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
deleted file mode 100644
index 261a111..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>)
- * <p>
- * Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>)
- * <p>
- * Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include
- * the "offset" (long), "topic" (String) and "partition" (int).
- */
-public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
- private final boolean includeMetadata;
- private ObjectMapper mapper;
-
- public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
- this.includeMetadata = includeMetadata;
- }
-
- @Override
- public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- if (mapper == null) {
- mapper = new ObjectMapper();
- }
- ObjectNode node = mapper.createObjectNode();
- node.set("key", mapper.readValue(messageKey, JsonNode.class));
- node.set("value", mapper.readValue(message, JsonNode.class));
- if (includeMetadata) {
- node.putObject("metadata")
- .put("offset", offset)
- .put("topic", topic)
- .put("partition", partition);
- }
- return node;
- }
-
- @Override
- public boolean isEndOfStream(ObjectNode nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<ObjectNode> getProducedType() {
- return getForClass(ObjectNode.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
deleted file mode 100644
index 4344810..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Deserialization schema from JSON to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
- * the specified fields.
- *
- * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
- */
-public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
-
- /** Field names to parse. Indices match fieldTypes indices. */
- private final String[] fieldNames;
-
- /** Types to parse fields as. Indices match fieldNames indices. */
- private final TypeInformation<?>[] fieldTypes;
-
- /** Object mapper for parsing the JSON. */
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- /** Flag indicating whether to fail on a missing field. */
- private boolean failOnMissingField;
-
- /**
- * Creates a JSON deserialization schema for the given fields and type classes.
- *
- * @param fieldNames Names of JSON fields to parse.
- * @param fieldTypes Type classes to parse JSON fields as.
- */
- public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
- this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
-
- this.fieldTypes = new TypeInformation[fieldTypes.length];
- for (int i = 0; i < fieldTypes.length; i++) {
- this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
- }
-
- Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types does not match.");
- }
-
- /**
- * Creates a JSON deserialization schema for the given fields and types.
- *
- * @param fieldNames Names of JSON fields to parse.
- * @param fieldTypes Types to parse JSON fields as.
- */
- public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
- this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
-
- Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types does not match.");
- }
-
- @Override
- public Row deserialize(byte[] message) throws IOException {
- try {
- JsonNode root = objectMapper.readTree(message);
-
- Row row = new Row(fieldNames.length);
- for (int i = 0; i < fieldNames.length; i++) {
- JsonNode node = root.get(fieldNames[i]);
-
- if (node == null) {
- if (failOnMissingField) {
- throw new IllegalStateException("Failed to find field with name '"
- + fieldNames[i] + "'.");
- } else {
- row.setField(i, null);
- }
- } else {
- // Read the value as specified type
- Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
- row.setField(i, value);
- }
- }
-
- return row;
- } catch (Throwable t) {
- throw new IOException("Failed to deserialize JSON object.", t);
- }
- }
-
- @Override
- public boolean isEndOfStream(Row nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Row> getProducedType() {
- return new RowTypeInfo(fieldTypes);
- }
-
- /**
- * Configures the failure behaviour if a JSON field is missing.
- *
- * <p>By default, a missing field is ignored and the field is set to null.
- *
- * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
- */
- public void setFailOnMissingField(boolean failOnMissingField) {
- this.failOnMissingField = failOnMissingField;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
deleted file mode 100644
index 077ff13..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.util.Preconditions;
-
-
-/**
- * Serialization schema that serializes an object into a JSON bytes.
- *
- * <p>Serializes the input {@link Row} object into a JSON string and
- * converts it into <code>byte[]</code>.
- *
- * <p>Result <code>byte[]</code> messages can be deserialized using
- * {@link JsonRowDeserializationSchema}.
- */
-public class JsonRowSerializationSchema implements SerializationSchema<Row> {
- /** Fields names in the input Row object */
- private final String[] fieldNames;
- /** Object mapper that is used to create output JSON objects */
- private static ObjectMapper mapper = new ObjectMapper();
-
- /**
- * Creates a JSON serialization schema for the given fields and types.
- *
- * @param fieldNames Names of JSON fields to parse.
- */
- public JsonRowSerializationSchema(String[] fieldNames) {
- this.fieldNames = Preconditions.checkNotNull(fieldNames);
- }
-
- @Override
- public byte[] serialize(Row row) {
- if (row.productArity() != fieldNames.length) {
- throw new IllegalStateException(String.format(
- "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
- }
-
- ObjectNode objectNode = mapper.createObjectNode();
-
- for (int i = 0; i < row.productArity(); i++) {
- JsonNode node = mapper.valueToTree(row.productElement(i));
- objectNode.set(fieldNames[i], node);
- }
-
- try {
- return mapper.writeValueAsBytes(objectNode);
- } catch (Exception e) {
- throw new RuntimeException("Failed to serialize row", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
deleted file mode 100644
index 01e72ca..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The deserialization schema describes how to turn the byte key / value messages delivered by certain
- * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
- * processed by Flink.
- *
- * @param <T> The type created by the keyed deserialization schema.
- */
-public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
-
- /**
- * Deserializes the byte message.
- *
- * @param messageKey the key as a byte array (null if no key has been set)
- * @param message The message, as a byte array. (null if the message was empty or deleted)
- * @param partition The partition the message has originated from
- * @param offset the offset of the message in the original source (for example the Kafka offset) @return The deserialized message as an object.
- */
- T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
-
- /**
- * Method to decide whether the element signals the end of the stream. If
- * true is returned the element won't be emitted.
- *
- * @param nextElement The element to test for the end-of-stream signal.
- * @return True, if the element signals end of stream, false otherwise.
- */
- boolean isEndOfStream(T nextElement);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
deleted file mode 100644
index 4b9dba2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-/**
- * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
- * interface
- * @param <T> The type created by the deserialization schema.
- */
-public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
-
- private static final long serialVersionUID = 2651665280744549932L;
-
- private final DeserializationSchema<T> deserializationSchema;
-
- public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
- this.deserializationSchema = deserializationSchema;
- }
- @Override
- public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- return deserializationSchema.deserialize(message);
- }
-
- @Override
- public boolean isEndOfStream(T nextElement) {
- return deserializationSchema.isEndOfStream(nextElement);
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return deserializationSchema.getProducedType();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
deleted file mode 100644
index 701281e..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-/**
- * The serialization schema describes how to turn a data object into a different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data to be handed
- * to them in a specific format (for example as byte strings).
- *
- * @param <T> The type to be serialized.
- */
-public interface KeyedSerializationSchema<T> extends Serializable {
-
- /**
- * Serializes the key of the incoming element to a byte array
- * This method might return null if no key is available.
- *
- * @param element The incoming element to be serialized
- * @return the key of the element as a byte array
- */
- byte[] serializeKey(T element);
-
-
- /**
- * Serializes the value of the incoming element to a byte array
- *
- * @param element The incoming element to be serialized
- * @return the value of the element as a byte array
- */
- byte[] serializeValue(T element);
-
- /**
- * Optional method to determine the target topic for the element
- *
- * @param element Incoming element to determine the target topic from
- * @return null or the target topic
- */
- String getTargetTopic(T element);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
deleted file mode 100644
index 1b3e486..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-/**
- * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
- * interface
- * @param <T> The type to serialize
- */
-public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
-
- private static final long serialVersionUID = 1351665280744549933L;
-
- private final SerializationSchema<T> serializationSchema;
-
- public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) {
- this.serializationSchema = serializationSchema;
- }
-
- @Override
- public byte[] serializeKey(T element) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(T element) {
- return serializationSchema.serialize(element);
- }
-
- @Override
- public String getTargetTopic(T element) {
- return null; // we are never overriding the topic
- }
-}