You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:18 UTC
[07/10] flink git commit: [FLINK-1935] Reimplement
PersistentKafkaSource using high level Kafka API
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
deleted file mode 100644
index bf3b3f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.common.NotLeaderForPartitionException;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.MessageAndOffset;
-
-/**
- * Iterates the records received from a partition of a Kafka topic as byte arrays.
- *
- * This code is in parts based on https://gist.github.com/ashrithr/5811266.
- */
-public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
-
- private List<String> hosts;
- private String topic;
- private int partition;
- private long readOffset;
- private transient SimpleConsumer consumer;
- private List<String> replicaBrokers;
- private String clientName;
- private Broker leadBroker;
- private final ConsumerConfig consumerConfig;
-
- private KafkaOffset initialOffset;
- private transient Iterator<MessageAndOffset> iter;
- private transient FetchResponse fetchResponse;
-
- /**
- * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
- * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
- *
- * @param topic
- * Name of the topic to listen to
- * @param partition
- * Partition in the chosen topic
- * @param initialOffset
- * Offset to start consuming at
- * @param kafkaTopicUtils
- * Util for receiving topic metadata
- */
- public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset,
- KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
-
- Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition);
- this.hosts = new ArrayList<String>(brokerAddresses);
-
- this.consumerConfig = consumerConfig;
- this.topic = topic;
- this.partition = partition;
-
- this.initialOffset = initialOffset;
-
- this.replicaBrokers = new ArrayList<String>();
- }
-
- // --------------------------------------------------------------------------------------------
- // Initializing a connection
- // --------------------------------------------------------------------------------------------
-
- /**
- * Initializes the connection by detecting the leading broker of
- * the topic and establishing a connection to it.
- */
- public void initialize() {
- if (LOG.isInfoEnabled()) {
- LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
- }
-
- PartitionMetadata metadata = getPartitionMetadata();
-
- leadBroker = metadata.leader();
- clientName = "Client_" + topic + "_" + partition;
-
- consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
-
- try {
- readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
- } catch (NotLeaderForPartitionException e) {
- throw new RuntimeException("Unable to get offset",e);
- }
-
- try {
- resetFetchResponse(readOffset);
- } catch (ClosedChannelException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Got ClosedChannelException, trying to find new leader.");
- }
- findNewLeader();
- }
- }
-
- private PartitionMetadata getPartitionMetadata() {
- PartitionMetadata metadata;
- int retry = 0;
- int waitTime = consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY, PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT);
- do {
- metadata = findLeader(hosts, topic, partition);
- /*try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- throw new RuntimeException("Establishing connection to Kafka failed", e);
- } */
- if(metadata == null) {
- retry++;
- if(retry == consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY, PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) {
- throw new RuntimeException("Tried finding a leader "+retry+" times without success");
- }
- LOG.warn("Unable to get leader and partition metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1);
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException e) {
- throw new RuntimeException("Establishing connection to Kafka failed", e);
- }
- }
- } while (metadata == null);
-
- if (metadata.leader() == null) {
- throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
- }
-
- return metadata;
- }
-
- /**
- * Sets the partition to read from.
- *
- * @param partition
- * partition number
- */
- public void setPartition(int partition) {
- this.partition = partition;
- }
-
- // --------------------------------------------------------------------------------------------
- // Iterator methods
- // --------------------------------------------------------------------------------------------
-
- /**
- * Convenience method to emulate iterator behaviour.
- *
- * @return whether the iterator has a next element
- */
- public boolean hasNext() {
- return true;
- }
-
- /**
- * Returns the next message received from Kafka as a
- * byte array.
- *
- * @return next message as a byte array.
- */
- public byte[] next() {
- return nextWithOffset().getMessage();
- }
-
- public boolean fetchHasNext() {
- synchronized (fetchResponse) {
- if (!iter.hasNext()) {
- try {
- resetFetchResponse(readOffset);
- } catch (ClosedChannelException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Got ClosedChannelException, trying to find new leader.", e);
- }
- findNewLeader();
- }
- return iter.hasNext();
- } else {
- return true;
- }
- }
- }
-
- /**
- * Returns the next message and its offset received from
- * Kafka encapsulated in a POJO.
- *
- * @return next message and its offset.
- */
- public MessageWithMetadata nextWithOffset() {
-
- synchronized (fetchResponse) {
- if (!iter.hasNext()) {
- throw new RuntimeException(
- "Trying to read when response is not fetched. Call fetchHasNext() first.");
- }
-
- MessageAndOffset messageAndOffset = iter.next();
- long currentOffset = messageAndOffset.offset();
-
- while (currentOffset < readOffset) {
- messageAndOffset = iter.next();
- currentOffset = messageAndOffset.offset();
- }
-
- readOffset = messageAndOffset.nextOffset();
- ByteBuffer payload = messageAndOffset.message().payload();
-
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
-
- return new MessageWithMetadata(messageAndOffset.offset(), bytes, partition);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Internal utilities
- // --------------------------------------------------------------------------------------------
-
- private void resetFetchResponse(long offset) throws ClosedChannelException {
- FetchRequest req = new FetchRequestBuilder().clientId(clientName)
- .addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
-
- fetchResponse = consumer.fetch(req);
-
- if (fetchResponse.hasError()) {
- short code = fetchResponse.errorCode(topic, partition);
-
- if (LOG.isErrorEnabled()) {
- LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
- }
-
- if (code == ErrorMapping.OffsetOutOfRangeCode()) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Asked for invalid offset {}", offset);
- }
- String reset = consumerConfig.autoOffsetReset();
- if(reset.equals("smallest")) {
- LOG.info("Setting read offset to beginning (smallest)");
- readOffset = new BeginningOffset().getOffset(consumer, topic, partition, clientName);
- } else if(reset.equals("largest")) {
- LOG.info("Setting read offset to current offset (largest)");
- readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
- } else {
- throw new RuntimeException("Unknown 'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 'largest'.");
- }
- }
-
- findNewLeader();
- }
-
- iter = fetchResponse.messageSet(topic, partition).iterator();
- }
-
- private void findNewLeader() {
- consumer.close();
- consumer = null;
- leadBroker = findNewLeader(leadBroker, topic, partition);
- consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
- }
-
- private PartitionMetadata findLeader(List<String> addresses, String topic, int partition) {
-
- PartitionMetadata returnMetaData = null;
- loop:
- for (String address : addresses) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to find leader via broker: {}", address);
- }
-
- String[] split = address.split(":");
- String host = split[0];
- int port = Integer.parseInt(split[1]);
-
- SimpleConsumer consumer = null;
- try {
- consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
- List<String> topics = Collections.singletonList(topic);
-
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
-
- kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
- List<TopicMetadata> metaData = resp.topicsMetadata();
- for (TopicMetadata item : metaData) {
- for (PartitionMetadata part : item.partitionsMetadata()) {
- if (part.partitionId() == partition) {
- returnMetaData = part;
- break loop;
- }
- }
- }
- } catch (Exception e) {
- if (e instanceof ClosedChannelException) {
- LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
- "[{}] to find Leader for [{}, {}]. Trying other replicas.", address, topic, partition);
- } else {
- throw new RuntimeException("Error communicating with Broker [" + address + "] to find Leader for [" + topic + ", " + partition + "]", e);
- }
- } finally {
- if (consumer != null) {
- consumer.close();
- }
- }
- }
- if (returnMetaData != null) {
- replicaBrokers.clear();
- for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
- replicaBrokers.add(replica.host() + ":" + replica.port());
- }
- }
- return returnMetaData;
- }
-
- private Broker findNewLeader(Broker oldLeader, String topic, int a_partition) {
- for (int i = 0; i < 3; i++) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Trying to find a new leader after Broker failure.");
- }
- boolean goToSleep = false;
- PartitionMetadata metadata = findLeader(replicaBrokers, topic, a_partition);
- if (metadata == null) {
- goToSleep = true;
- } else if (metadata.leader() == null) {
- goToSleep = true;
- } else if (oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
- // first time through if the leader hasn't changed give ZooKeeper a second to recover
- // second time, assume the broker did recover before failover, or it was a non-Broker issue
- //
- goToSleep = true;
- } else {
- return metadata.leader();
- }
- if (goToSleep) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ie) {
- }
- }
- }
- throw new RuntimeException("Unable to find new leader after Broker failure.");
- }
-
- public int getId() {
- return this.partition;
- }
-
- @Override
- public String toString() {
- return "SinglePartitionIterator{partition="+partition+" readOffset="+readOffset+"}";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
deleted file mode 100644
index 15e7b36..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class BeginningOffset extends KafkaOffset {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
- return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
deleted file mode 100644
index 6119f32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class CurrentOffset extends KafkaOffset {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
- return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
deleted file mode 100644
index 3aec7ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- * Offset given by a message read from Kafka.
- */
-public class GivenOffset extends KafkaOffset {
-
- private static final long serialVersionUID = 1L;
- private final long offset;
-
- public GivenOffset(long offset) {
- this.offset = offset;
- }
-
- @Override
- public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
- return offset;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
deleted file mode 100644
index 2eaa2b8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- * Superclass for various kinds of KafkaOffsets.
- */
-public abstract class KafkaOffset implements Serializable {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
-
- private static final long serialVersionUID = 1L;
-
- public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
- String clientName);
-
- /**
- *
- * @param consumer
- * @param topic
- * @param partition
- * @param whichTime Type of offset request (latest time / earliest time)
- * @param clientName
- * @return
- */
- protected long getLastOffset(SimpleConsumer consumer, String topic, int partition,
- long whichTime, String clientName) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
- kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- while (response.hasError()) {
- int errorCode = response.errorCode(topic, partition);
- LOG.warn("Response has error. Error code "+errorCode);
- switch (errorCode) {
- case 6:
- case 3:
- LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
- break;
- default:
- throw new RuntimeException("Error fetching data from Kafka broker. Error code " + errorCode);
- }
-
- request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- response = consumer.getOffsetsBefore(request);
- }
-
- long[] offsets = response.offsets(topic, partition);
- if(offsets.length > 1) {
- LOG.warn("The offset response unexpectedly contained more than one offset: "+ Arrays.toString(offsets) + " Using only first one");
- }
- return offsets[0];
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
deleted file mode 100644
index 02c49df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-/**
- * Enum controlling the offset behavior of the PersistentKafkaSource.
- */
-public enum Offset {
- /**
- * Read the Kafka topology from the beginning
- */
- FROM_BEGINNING,
- /**
- * Read the topology from the current offset. (Default).
- */
- FROM_CURRENT
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 87c6a34..246756c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -25,14 +25,20 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
import kafka.consumer.ConsumerConfig;
-import org.apache.commons.lang.SerializationUtils;
+import kafka.network.SocketServer;
+import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -40,25 +46,25 @@ import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -66,6 +72,7 @@ import org.slf4j.LoggerFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
+import scala.collection.Seq;
/**
* Code in this test is based on the following GitHub repository:
@@ -91,9 +98,15 @@ public class KafkaITCase {
private static TestingServer zookeeper;
private static List<KafkaServer> brokers;
+ private static String brokerConnectionStrings = "";
private static boolean shutdownKafkaBroker;
+ private static ConsumerConfig standardCC;
+
+ private static ZkClient zkClient;
+
+
@BeforeClass
public static void prepare() throws IOException {
LOG.info("Starting KafkaITCase.prepare()");
@@ -118,6 +131,12 @@ public class KafkaITCase {
brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+ SocketServer socketServer = brokers.get(i).socketServer();
+ String host = "localhost";
+ if(socketServer.host() != null) {
+ host = socketServer.host();
+ }
+ brokerConnectionStrings += host+":"+socketServer.port()+",";
}
LOG.info("ZK and KafkaServer started.");
@@ -125,6 +144,17 @@ public class KafkaITCase {
LOG.warn("Test failed with exception", t);
Assert.fail("Test failed with: " + t.getMessage());
}
+
+ Properties cProps = new Properties();
+ cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ cProps.setProperty("group.id", "flink-tests");
+ cProps.setProperty("auto.commit.enable", "false");
+
+ cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
+
+ standardCC = new ConsumerConfig(cProps);
+
+ zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
}
@AfterClass
@@ -142,8 +172,196 @@ public class KafkaITCase {
LOG.warn("ZK.stop() failed", e);
}
}
+ zkClient.close();
+ }
+
+
+ @Test
+ public void testOffsetManipulation() {
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+ final String topicName = "testOffsetManipulation";
+
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topicName);
+ AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
+
+ Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+
+ zk.close();
+ }
+ /**
+ * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
+ *
+ */
+ @Test
+ public void testPersistentSourceWithOffsetUpdates() throws Exception {
+ LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
+
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+ final String topicName = "testOffsetHacking";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(50);
+ env.setNumberOfExecutionRetries(0);
+
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topicName);
+ AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+ // write a sequence from 0 to 99 to each of the three partitions.
+ writeSequence(env, topicName, 0, 99);
+
+ readSequence(env, standardCC, topicName, 0, 100, 300);
+
+ // check offsets
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
+ Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));
+
+
+ LOG.info("Manipulating offsets");
+ // set the offset to 25, 50, and 75 for the three partitions
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
+ PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
+
+ // create new env
+ env = StreamExecutionEnvironment.createLocalEnvironment(3);
+ env.getConfig().disableSysoutLogging();
+ readSequence(env, standardCC, topicName, 50, 50, 150);
+
+ zk.close();
+
+ LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
+ }
+
+ private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
+ LOG.info("Reading sequence for verification until final count {}", finalCount);
+ DataStream<Tuple2<Integer, Integer>> source = env.addSource(
+ new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
+ )
+ //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
+ // to play this trick. The problem is that we have to wait until all checkpoints are confirmed
+ .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ Thread.sleep(75);
+ return value;
+ }
+ }).setParallelism(3);
+
+ // verify data
+ DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+ int[] values = new int[valuesCount];
+ int count = 0;
+
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+ values[value.f1 - valuesStartFrom]++;
+ count++;
+
+ LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
+ // verify if we've seen everything
+
+ if (count == finalCount) {
+ LOG.info("Received all values");
+ for (int i = 0; i < values.length; i++) {
+ int v = values[i];
+ if (v != 3) {
+ throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+ }
+ }
+ // test has passed
+ throw new SuccessException();
+ }
+ }
+
+ }).setParallelism(1);
+
+ tryExecute(env, "Read data from Kafka");
+
+ LOG.info("Successfully read sequence for verification");
+ }
+
+
+
+ private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
+ LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
+ DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+ LOG.info("Starting source.");
+ int cnt = from;
+ int partition = getRuntimeContext().getIndexOfThisSubtask();
+ while (running) {
+ LOG.info("Writing " + cnt + " to partition " + partition);
+ collector.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
+ if (cnt == to) {
+ LOG.info("Writer reached end.");
+ return;
+ }
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Source got cancel()");
+ running = false;
+ }
+ }).setParallelism(3);
+ stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
+ topicName,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
+ new T2Partitioner()
+ )).setParallelism(3);
+ env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
+ LOG.info("Finished writing sequence");
+ }
+
+ private static class T2Partitioner implements SerializableKafkaPartitioner {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ if(numPartitions != 3) {
+ throw new IllegalArgumentException("Expected three partitions");
+ }
+ Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+ return element.f0;
+ }
+ }
+
+ public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ if(t == null) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + good.getMessage());
+ }
+
+ t = t.getCause();
+ if (limit++ == 20) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + good.getMessage());
+ }
+ }
+ }
}
+
@Test
public void regularKafkaSourceTest() throws Exception {
LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
@@ -152,10 +370,9 @@ public class KafkaITCase {
createTestTopic(topic, 1, 1);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
// add consuming topology:
DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new TupleSerializationSchema(), 5000));
+ new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
int elCnt = 0;
int start = -1;
@@ -210,22 +427,9 @@ public class KafkaITCase {
running = false;
}
});
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
- try {
- env.setParallelism(1);
- env.execute();
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
- }
- }
+ tryExecute(env, "regular kafka source test");
LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
}
@@ -241,7 +445,10 @@ public class KafkaITCase {
// add consuming topology:
DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+ new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+ standardCC
+ ));
consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
int elCnt = 0;
int start = -1;
@@ -304,22 +511,9 @@ public class KafkaITCase {
running = false;
}
});
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
- try {
- env.setParallelism(1);
- env.execute();
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
- }
- }
+ tryExecute(env, "tupletesttopology");
LOG.info("Finished KafkaITCase.tupleTestTopology()");
}
@@ -347,16 +541,19 @@ public class KafkaITCase {
consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
consumerProps.setProperty("group.id", "test");
+ consumerProps.setProperty("auto.commit.enable", "false");
+ consumerProps.setProperty("auto.offset.reset", "smallest");
ConsumerConfig cc = new ConsumerConfig(consumerProps);
DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, Offset.FROM_BEGINNING, cc));
+ new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
int elCnt = 0;
@Override
public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ LOG.info("Received {}", value.f0);
elCnt++;
if(value.f0 == -1) {
// we should have seen 11 elements now.
@@ -370,7 +567,7 @@ public class KafkaITCase {
throw new RuntimeException("More than 10 elements seen: "+elCnt);
}
}
- });
+ }).setParallelism(1);
// add producing topology
DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -398,6 +595,7 @@ public class KafkaITCase {
} catch (InterruptedException ignored) {
}
if(cnt == 10) {
+ LOG.info("Send end signal");
// signal end
collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
running = false;
@@ -412,31 +610,16 @@ public class KafkaITCase {
}
});
- stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(zookeeperConnectionString, topic,
+ stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
);
- try {
- env.setParallelism(1);
- env.execute();
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
- }
- }
+ tryExecute(env, "big topology test");
LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
}
- private static boolean partitionerHasBeenCalled = false;
-
@Test
public void customPartitioningTestTopology() throws Exception {
LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
@@ -449,7 +632,9 @@ public class KafkaITCase {
// add consuming topology:
DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
- new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+ new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+ standardCC));
consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
int start = -1;
BitSet validator = new BitSet(101);
@@ -519,23 +704,9 @@ public class KafkaITCase {
running = false;
}
});
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), new CustomPartitioner()));
-
- try {
- env.setParallelism(1);
- env.execute();
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- t = t.getCause();
- if (limit++ == 20) {
- throw good;
- }
- }
+ stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
- assertTrue(partitionerHasBeenCalled);
- }
+ tryExecute(env, "custom partitioning test");
LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
}
@@ -547,7 +718,6 @@ public class KafkaITCase {
@Override
public int partition(Object key, int numPartitions) {
- partitionerHasBeenCalled = true;
@SuppressWarnings("unchecked")
Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
@@ -561,25 +731,6 @@ public class KafkaITCase {
}
}
- private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, String>, byte[]> {
-
- @SuppressWarnings("unchecked")
- @Override
- public Tuple2<Long, String> deserialize(byte[] message) {
- Object deserializedObject = SerializationUtils.deserialize(message);
- return (Tuple2<Long, String>) deserializedObject;
- }
-
- @Override
- public byte[] serialize(Tuple2<Long, String> element) {
- return SerializationUtils.serialize(element);
- }
-
- @Override
- public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
- return false;
- }
- }
@Test
public void simpleTestTopology() throws Exception {
@@ -591,7 +742,7 @@ public class KafkaITCase {
// add consuming topology:
DataStreamSource<String> consuming = env.addSource(
- new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
+ new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
consuming.addSink(new SinkFunction<String>() {
int elCnt = 0;
int start = -1;
@@ -643,34 +794,34 @@ public class KafkaITCase {
running = false;
}
});
- stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()));
+ stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
- try {
- env.setParallelism(1);
- env.execute();
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
- }
- }
+ tryExecute(env, "simpletest");
}
private static boolean leaderHasShutDown = false;
- @Test
+ @Test(timeout=60000)
public void brokerFailureTest() throws Exception {
String topic = "brokerFailureTestTopic";
createTestTopic(topic, 2, 2);
- KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
- final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+ // KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+ // final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+
+ PartitionMetadata firstPart = null;
+ do {
+ if(firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
+ }
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ } while(firstPart.errorCode() != 0);
+
+ final String leaderToShutDown = firstPart.leader().get().connectionString();
final Thread brokerShutdown = new Thread(new Runnable() {
@Override
@@ -704,7 +855,7 @@ public class KafkaITCase {
// add consuming topology:
DataStreamSource<String> consuming = env.addSource(
- new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING));
+ new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
consuming.setParallelism(1);
consuming.addSink(new SinkFunction<String>() {
@@ -717,7 +868,7 @@ public class KafkaITCase {
@Override
public void invoke(String value) throws Exception {
- LOG.info("Got message = " + value + " leader has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ numOfMessagesToBeCorrect);
+ LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
String[] sp = value.split("-");
int v = Integer.parseInt(sp[1]);
@@ -736,8 +887,8 @@ public class KafkaITCase {
shutdownKafkaBroker = true;
}
- if(leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
- if (elCnt >= stopAfterMessages ) {
+ if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+ if (elCnt >= stopAfterMessages) {
// check if everything in the bitset is set to true
int nc;
if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
@@ -779,29 +930,18 @@ public class KafkaITCase {
running = false;
}
});
- stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()))
+ stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
.setParallelism(1);
- try {
- env.setParallelism(1);
- env.execute();
- } catch (JobExecutionException good) {
- Throwable t = good.getCause();
- int limit = 0;
- while (!(t instanceof SuccessException)) {
- t = t.getCause();
- if (limit++ == 20) {
- LOG.warn("Test failed with exception", good);
- Assert.fail("Test failed with: " + good.getMessage());
- }
- }
- }
+ tryExecute(env, "broker failure test");
}
private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
- KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
- kafkaTopicUtils.createTopic(topic, numberOfPartitions, replicationFactor);
+ // create topic
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topic);
+ AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
}
private static TestingServer getZookeeper() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
deleted file mode 100644
index 5f0e198..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.PartitionMetadata;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-public class KafkaTopicUtilsTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
- private static final int NUMBER_OF_BROKERS = 2;
- private static final String TOPIC = "myTopic";
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Test
- public void test() {
- int zkPort;
- String kafkaHost;
- String zookeeperConnectionString;
-
- File tmpZkDir;
- List<File> tmpKafkaDirs;
- Map<String, KafkaServer> kafkaServers = null;
- TestingServer zookeeper = null;
-
- try {
- tmpZkDir = tempFolder.newFolder();
-
- tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS);
- for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
- tmpKafkaDirs.add(tempFolder.newFolder());
- }
-
- zkPort = NetUtils.getAvailablePort();
- kafkaHost = InetAddress.getLocalHost().getHostName();
- zookeeperConnectionString = "localhost:" + zkPort;
-
- // init zookeeper
- zookeeper = new TestingServer(zkPort, tmpZkDir);
-
- // init kafka kafkaServers
- kafkaServers = new HashMap<String, KafkaServer>();
-
- for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
- KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i));
- kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(), kafkaServer);
- }
-
- // create Kafka topic
- final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
- kafkaTopicUtils.createTopic(TOPIC, 1, 2);
-
- // check whether topic exists
- assertTrue(kafkaTopicUtils.topicExists(TOPIC));
-
- // check number of partitions
- assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC));
-
- // get partition metadata without error
- PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0);
- assertEquals(0, partitionMetadata.errorCode());
-
- // get broker list
- assertEquals(new HashSet<String>(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC));
- } catch (IOException e) {
- fail(e.toString());
- } catch (Exception e) {
- fail(e.toString());
- } finally {
- LOG.info("Shutting down all services");
- for (KafkaServer broker : kafkaServers.values()) {
- if (broker != null) {
- broker.shutdown();
- }
- }
-
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- } catch (IOException e) {
- LOG.warn("ZK.stop() failed", e);
- }
- }
- }
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- private static KafkaServer getKafkaServer(String kafkaHost, String zookeeperConnectionString, int brokerId, File tmpFolder) throws UnknownHostException {
- Properties kafkaProperties = new Properties();
-
- int kafkaPort = NetUtils.getAvailablePort();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", kafkaHost);
- kafkaProperties.put("port", Integer.toString(kafkaPort));
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
- server.startup();
- return server;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
index dc20726..9ede613 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 196f7ec..4bd89c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.api.checkpoint;
+import java.io.Serializable;
+
/**
* This interface marks a function/operator as <i>asynchronously checkpointed</i>.
* Similar to the {@link Checkpointed} interface, the function must produce a
@@ -32,4 +34,4 @@ package org.apache.flink.streaming.api.checkpoint;
* {@link #snapshotState(long, long)} method is typically a copy or shadow copy
* of the actual state.</p>
*/
-public interface CheckpointedAsynchronously extends Checkpointed {}
+public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index a59407c..3efad93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -30,8 +30,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute() throws Exception {
- return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(),
- getConfig().isSysoutLoggingEnabled());
+ return execute(DEFAULT_JOB_NAME);
}
/**
@@ -44,7 +43,9 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
- getConfig().isSysoutLoggingEnabled());
+ JobExecutionResult result = ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
+ getConfig().isSysoutLoggingEnabled());
+ streamGraph.clear(); // clear graph to allow submitting another job via the same environment.
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b0471f9..2e33b82 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
-import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
@@ -67,6 +67,8 @@ import com.esotericsoftware.kryo.Serializer;
*/
public abstract class StreamExecutionEnvironment {
+ public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
+
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
private long bufferTimeout = 100;
@@ -624,8 +626,8 @@ public abstract class StreamExecutionEnvironment {
TypeInformation<OUT> outTypeInfo;
- if (function instanceof GenericSourceFunction) {
- outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
+ if (function instanceof ResultTypeQueryable) {
+ outTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
deleted file mode 100644
index 0113cfe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public interface GenericSourceFunction<T> {
-
- TypeInformation<T> getType();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 93bf8eb..271c05c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -60,8 +60,7 @@ import org.slf4j.LoggerFactory;
public class StreamGraph extends StreamingPlan {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
- private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
- private String jobName = DEAFULT_JOB_NAME;
+ private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
private final StreamExecutionEnvironment environemnt;
private final ExecutionConfig executionConfig;
@@ -70,17 +69,25 @@ public class StreamGraph extends StreamingPlan {
private long checkpointingInterval = 5000;
private boolean chaining = true;
- private final Map<Integer, StreamNode> streamNodes;
- private final Set<Integer> sources;
+ private Map<Integer, StreamNode> streamNodes;
+ private Set<Integer> sources;
- private final Map<Integer, StreamLoop> streamLoops;
- protected final Map<Integer, StreamLoop> vertexIDtoLoop;
+ private Map<Integer, StreamLoop> streamLoops;
+ protected Map<Integer, StreamLoop> vertexIDtoLoop;
public StreamGraph(StreamExecutionEnvironment environment) {
this.environemnt = environment;
executionConfig = environment.getConfig();
+ // create an empty new stream graph.
+ clear();
+ }
+
+ /**
+ * Remove all registered nodes etc.
+ */
+ public void clear() {
streamNodes = new HashMap<Integer, StreamNode>();
streamLoops = new HashMap<Integer, StreamLoop>();
vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f66e394..24a08eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,8 +137,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
callUserFunction();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
- LOG.error("Calling user function failed due to: {}",
- StringUtils.stringifyException(e));
+ LOG.error("Calling user function failed", e);
}
throw new RuntimeException(e);
}
@@ -168,7 +166,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
try {
FunctionUtils.closeFunction(userFunction);
} catch (Exception e) {
- throw new RuntimeException("Error when closing the function: " + e.getMessage());
+ throw new RuntimeException("Error when closing the function", e);
}
}
@@ -187,8 +185,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
public void setChainingStrategy(ChainingStrategy strategy) {
if (strategy == ChainingStrategy.ALWAYS) {
if (!(this instanceof ChainableStreamOperator)) {
- throw new RuntimeException(
- "Operator needs to extend ChainableOperator to be chained");
+ throw new RuntimeException("Operator needs to extend ChainableOperator to be chained");
}
}
this.chainingStrategy = strategy;
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
deleted file mode 100644
index 4dd4b45..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Base class for representing operator states that can be repartitioned for
- * state state and load balancing.
- *
- * @param <T>
- * The type of the operator state.
- */
-public abstract class PartitionableState<T> extends OperatorState<T> {
-
- private static final long serialVersionUID = 1L;
-
- PartitionableState(T initialState) {
- super(initialState);
- }
-
- /**
- * Repartitions(divides) the current state into the given number of new
- * partitions. The created partitions will be used to redistribute then
- * rebuild the state among the parallel instances of the operator. The
- * implementation should reflect the partitioning of the input values to
- * maintain correct operator behavior.
- *
- * </br> </br> It is also assumed that if we would {@link #reBuild} the
- * repartitioned state we would basically get the same as before.
- *
- *
- * @param numberOfPartitions
- * The desired number of partitions. The method must return an
- * array of that size.
- * @return The array containing the state part for each partition.
- */
- public abstract OperatorState<T>[] repartition(int numberOfPartitions);
-
- /**
- * Rebuilds the current state partition from the given parts. Used for
- * building the state after a re-balance phase.
- *
- * @param parts
- * The state parts that will be used to rebuild the current
- * partition.
- * @return The rebuilt operator state.
- */
- public abstract OperatorState<T> reBuild(OperatorState<T>... parts);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 930c9b4..0259568 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -273,7 +273,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
synchronized (checkpointLock) {
if (isRunning) {
try {
- LOG.info("Starting checkpoint " + checkpointId);
+ LOG.info("Starting checkpoint {} on task {}", checkpointId, getName());
// first draw the state that should go into checkpoint
LocalStateHandle state;
@@ -282,7 +282,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
state = userState == null ? null : new LocalStateHandle(userState);
}
catch (Exception e) {
- throw new Exception("Error while drawing snapshot of the user state.");
+ throw new Exception("Error while drawing snapshot of the user state.", e);
}
// now emit the checkpoint barriers
@@ -333,8 +333,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
}
catch (Exception e) {
- throw new RuntimeException(
- "Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+ throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index faaa79b..87c9757 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,9 +17,11 @@
package org.apache.flink.streaming.util.serialization;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
import java.io.Serializable;
-public interface DeserializationSchema<T> extends Serializable {
+public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Deserializes the incoming data.
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
index 93d13ab..a4b1419 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
@@ -38,4 +40,9 @@ public class JavaDefaultStringSchema implements DeserializationSchema<String>, S
return SerializationUtils.deserialize(message);
}
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeExtractor.getForClass(String.class);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
index e457bef..9c5885f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -17,6 +17,9 @@
package org.apache.flink.streaming.util.serialization;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
public class RawSchema implements DeserializationSchema<byte[]>,
SerializationSchema<byte[], byte[]> {
@@ -36,4 +39,9 @@ public class RawSchema implements DeserializationSchema<byte[]>,
public byte[] serialize(byte[] element) {
return element;
}
+
+ @Override
+ public TypeInformation<byte[]> getProducedType() {
+ return TypeExtractor.getForClass(byte[].class);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 3d0a0d5..7c5946d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -17,6 +17,9 @@
package org.apache.flink.streaming.util.serialization;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
public class SimpleStringSchema implements DeserializationSchema<String>,
SerializationSchema<String, String> {
@@ -37,4 +40,8 @@ public class SimpleStringSchema implements DeserializationSchema<String>,
return element;
}
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeExtractor.getForClass(String.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
deleted file mode 100644
index 136a091..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class OperatorStateTest {
-
- @Test
- public void testOperatorState() {
- OperatorState<Integer> os = new OperatorState<Integer>(5);
-
- StateCheckpoint<Integer> scp = os.checkpoint();
-
- assertTrue(os.stateEquals(scp.restore()));
-
- assertEquals((Integer) 5, os.getState());
-
- os.update(10);
-
- assertEquals((Integer) 10, os.getState());
- }
-
-}