You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:18 UTC

[07/10] flink git commit: [FLINK-1935] Reimplement PersistentKafkaSource using high level Kafka API

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
deleted file mode 100644
index bf3b3f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.common.NotLeaderForPartitionException;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.MessageAndOffset;
-
-/**
- * Iterates the records received from a partition of a Kafka topic as byte arrays.
- *
- * This code is in parts based on https://gist.github.com/ashrithr/5811266.
- */
-public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
-
-	private List<String> hosts;
-	private String topic;
-	private int partition;
-	private long readOffset;
-	private transient SimpleConsumer consumer;
-	private List<String> replicaBrokers;
-	private String clientName;
-	private Broker leadBroker;
-	private final ConsumerConfig consumerConfig;
-
-	private KafkaOffset initialOffset;
-	private transient Iterator<MessageAndOffset> iter;
-	private transient FetchResponse fetchResponse;
-
-	/**
-	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
-	 * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
-	 *
-	 * @param topic
-	 * 		Name of the topic to listen to
-	 * @param partition
-	 * 		Partition in the chosen topic
-	 * @param initialOffset
-	 * 		Offset to start consuming at
-	 * @param kafkaTopicUtils
-	 * 		Util for receiving topic metadata
-	 */
-	public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset,
-			KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
-
-		Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition);
-		this.hosts = new ArrayList<String>(brokerAddresses);
-
-		this.consumerConfig = consumerConfig;
-		this.topic = topic;
-		this.partition = partition;
-
-		this.initialOffset = initialOffset;
-
-		this.replicaBrokers = new ArrayList<String>();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Initializing a connection
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Initializes the connection by detecting the leading broker of
-	 * the topic and establishing a connection to it.
-	 */
-	public void initialize() {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
-		}
-
-		PartitionMetadata metadata = getPartitionMetadata();
-
-		leadBroker = metadata.leader();
-		clientName = "Client_" + topic + "_" + partition;
-
-		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
-
-		try {
-			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
-		} catch (NotLeaderForPartitionException e) {
-			throw new RuntimeException("Unable to get offset",e);
-		}
-
-		try {
-			resetFetchResponse(readOffset);
-		} catch (ClosedChannelException e) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("Got ClosedChannelException, trying to find new leader.");
-			}
-			findNewLeader();
-		}
-	}
-
-	private PartitionMetadata getPartitionMetadata() {
-		PartitionMetadata metadata;
-		int retry = 0;
-		int waitTime = consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY, PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT);
-		do {
-			metadata = findLeader(hosts, topic, partition);
-			/*try {
-				Thread.sleep(10000);
-			} catch (InterruptedException e) {
-				throw new RuntimeException("Establishing connection to Kafka failed", e);
-			} */
-			if(metadata == null) {
-				retry++;
-				if(retry == consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY, PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) {
-					throw new RuntimeException("Tried finding a leader "+retry+" times without success");
-				}
-				LOG.warn("Unable to get leader and partition metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1);
-				try {
-					Thread.sleep(waitTime);
-				} catch (InterruptedException e) {
-					throw new RuntimeException("Establishing connection to Kafka failed", e);
-				}
-			}
-		} while (metadata == null);
-
-		if (metadata.leader() == null) {
-			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
-		}
-
-		return metadata;
-	}
-
-	/**
-	 * Sets the partition to read from.
-	 *
-	 * @param partition
-	 * 		partition number
-	 */
-	public void setPartition(int partition) {
-		this.partition = partition;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Iterator methods
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Convenience method to emulate iterator behaviour.
-	 *
-	 * @return whether the iterator has a next element
-	 */
-	public boolean hasNext() {
-		return true;
-	}
-
-	/**
-	 * Returns the next message received from Kafka as a
-	 * byte array.
-	 *
-	 * @return next message as a byte array.
-	 */
-	public byte[] next() {
-		return nextWithOffset().getMessage();
-	}
-
-	public boolean fetchHasNext() {
-		synchronized (fetchResponse) {
-			if (!iter.hasNext()) {
-				try {
-					resetFetchResponse(readOffset);
-				} catch (ClosedChannelException e) {
-					if (LOG.isWarnEnabled()) {
-						LOG.warn("Got ClosedChannelException, trying to find new leader.", e);
-					}
-					findNewLeader();
-				}
-				return iter.hasNext();
-			} else {
-				return true;
-			}
-		}
-	}
-
-	/**
-	 * Returns the next message and its offset received from
-	 * Kafka encapsulated in a POJO.
-	 *
-	 * @return next message and its offset.
-	 */
-	public MessageWithMetadata nextWithOffset() {
-
-		synchronized (fetchResponse) {
-			if (!iter.hasNext()) {
-				throw new RuntimeException(
-						"Trying to read when response is not fetched. Call fetchHasNext() first.");
-			}
-
-			MessageAndOffset messageAndOffset = iter.next();
-			long currentOffset = messageAndOffset.offset();
-
-			while (currentOffset < readOffset) {
-				messageAndOffset = iter.next();
-				currentOffset = messageAndOffset.offset();
-			}
-
-			readOffset = messageAndOffset.nextOffset();
-			ByteBuffer payload = messageAndOffset.message().payload();
-
-			byte[] bytes = new byte[payload.limit()];
-			payload.get(bytes);
-
-			return new MessageWithMetadata(messageAndOffset.offset(), bytes, partition);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal utilities
-	// --------------------------------------------------------------------------------------------
-
-	private void resetFetchResponse(long offset) throws ClosedChannelException {
-		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
-				.addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
-
-		fetchResponse = consumer.fetch(req);
-
-		if (fetchResponse.hasError()) {
-			short code = fetchResponse.errorCode(topic, partition);
-
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
-			}
-
-			if (code == ErrorMapping.OffsetOutOfRangeCode()) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Asked for invalid offset {}", offset);
-				}
-				String reset = consumerConfig.autoOffsetReset();
-				if(reset.equals("smallest")) {
-					LOG.info("Setting read offset to beginning (smallest)");
-					readOffset = new BeginningOffset().getOffset(consumer, topic, partition, clientName);
-				} else if(reset.equals("largest")) {
-					LOG.info("Setting read offset to current offset (largest)");
-					readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
-				} else {
-					throw new RuntimeException("Unknown 'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 'largest'.");
-				}
-			}
-
-			findNewLeader();
-		}
-
-		iter = fetchResponse.messageSet(topic, partition).iterator();
-	}
-
-	private void findNewLeader() {
-		consumer.close();
-		consumer = null;
-		leadBroker = findNewLeader(leadBroker, topic, partition);
-		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
-	}
-
-	private PartitionMetadata findLeader(List<String> addresses, String topic, int partition) {
-
-		PartitionMetadata returnMetaData = null;
-		loop:
-		for (String address : addresses) {
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Trying to find leader via broker: {}", address);
-			}
-
-			String[] split = address.split(":");
-			String host = split[0];
-			int port = Integer.parseInt(split[1]);
-
-			SimpleConsumer consumer = null;
-			try {
-				consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
-				List<String> topics = Collections.singletonList(topic);
-
-				TopicMetadataRequest req = new TopicMetadataRequest(topics);
-
-				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
-				List<TopicMetadata> metaData = resp.topicsMetadata();
-				for (TopicMetadata item : metaData) {
-					for (PartitionMetadata part : item.partitionsMetadata()) {
-						if (part.partitionId() == partition) {
-							returnMetaData = part;
-							break loop;
-						}
-					}
-				}
-			} catch (Exception e) {
-				if (e instanceof ClosedChannelException) {
-					LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
-							"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, topic, partition);
-				} else {
-					throw new RuntimeException("Error communicating with Broker [" + address + "] to find Leader for [" + topic + ", " + partition + "]", e);
-				}
-			} finally {
-				if (consumer != null) {
-					consumer.close();
-				}
-			}
-		}
-		if (returnMetaData != null) {
-			replicaBrokers.clear();
-			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
-				replicaBrokers.add(replica.host() + ":" + replica.port());
-			}
-		}
-		return returnMetaData;
-	}
-
-	private Broker findNewLeader(Broker oldLeader, String topic, int a_partition) {
-		for (int i = 0; i < 3; i++) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Trying to find a new leader after Broker failure.");
-			}
-			boolean goToSleep = false;
-			PartitionMetadata metadata = findLeader(replicaBrokers, topic, a_partition);
-			if (metadata == null) {
-				goToSleep = true;
-			} else if (metadata.leader() == null) {
-				goToSleep = true;
-			} else if (oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
-				// first time through if the leader hasn't changed give ZooKeeper a second to recover
-				// second time, assume the broker did recover before failover, or it was a non-Broker issue
-				//
-				goToSleep = true;
-			} else {
-				return metadata.leader();
-			}
-			if (goToSleep) {
-				try {
-					Thread.sleep(10000);
-				} catch (InterruptedException ie) {
-				}
-			}
-		}
-		throw new RuntimeException("Unable to find new leader after Broker failure.");
-	}
-
-	public int getId() {
-		return this.partition;
-	}
-
-	@Override
-	public String toString() {
-		return "SinglePartitionIterator{partition="+partition+" readOffset="+readOffset+"}";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
deleted file mode 100644
index 15e7b36..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class BeginningOffset extends KafkaOffset {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
-		return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
deleted file mode 100644
index 6119f32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class CurrentOffset extends KafkaOffset {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
-		return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
deleted file mode 100644
index 3aec7ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- * Offset given by a message read from Kafka.
- */
-public class GivenOffset extends KafkaOffset {
-
-	private static final long serialVersionUID = 1L;
-	private final long offset;
-
-	public GivenOffset(long offset) {
-		this.offset = offset;
-	}
-
-	@Override
-	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
-		return offset;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
deleted file mode 100644
index 2eaa2b8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-/**
- * Superclass for various kinds of KafkaOffsets.
- */
-public abstract class KafkaOffset implements Serializable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
-			String clientName);
-
-	/**
-	 *
-	 * @param consumer
-	 * @param topic
-	 * @param partition
-	 * @param whichTime Type of offset request (latest time / earliest time)
-	 * @param clientName
-	 * @return
-	 */
-	protected long getLastOffset(SimpleConsumer consumer, String topic, int partition,
-			long whichTime, String clientName) {
-		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-
-		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
-				kafka.api.OffsetRequest.CurrentVersion(), clientName);
-		OffsetResponse response = consumer.getOffsetsBefore(request);
-
-		while (response.hasError()) {
-			int errorCode = response.errorCode(topic, partition);
-			LOG.warn("Response has error. Error code "+errorCode);
-			switch (errorCode) {
-				case 6:
-				case 3:
-					LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
-					break;
-				default:
-					throw new RuntimeException("Error fetching data from Kafka broker. Error code " + errorCode);
-			}
-
-			request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-			response = consumer.getOffsetsBefore(request);
-		}
-
-		long[] offsets = response.offsets(topic, partition);
-		if(offsets.length > 1) {
-			LOG.warn("The offset response unexpectedly contained more than one offset: "+ Arrays.toString(offsets) + " Using only first one");
-		}
-		return offsets[0];
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
deleted file mode 100644
index 02c49df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/Offset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
-
-/**
- * Enum controlling the offset behavior of the PersistentKafkaSource.
- */
-public enum Offset {
-	/**
-	 * Read the Kafka topology from the beginning
-	 */
-	FROM_BEGINNING,
-	/**
-	 * Read the topology from the current offset. (Default).
-	 */
-	FROM_CURRENT
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 87c6a34..246756c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -25,14 +25,20 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
 import kafka.consumer.ConsumerConfig;
-import org.apache.commons.lang.SerializationUtils;
+import kafka.network.SocketServer;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -40,25 +46,25 @@ import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -66,6 +72,7 @@ import org.slf4j.LoggerFactory;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
+import scala.collection.Seq;
 
 /**
  * Code in this test is based on the following GitHub repository:
@@ -91,9 +98,15 @@ public class KafkaITCase {
 
 	private static TestingServer zookeeper;
 	private static List<KafkaServer> brokers;
+	private static String brokerConnectionStrings = "";
 
 	private static boolean shutdownKafkaBroker;
 
+	private static ConsumerConfig standardCC;
+
+	private static ZkClient zkClient;
+
+
 	@BeforeClass
 	public static void prepare() throws IOException {
 		LOG.info("Starting KafkaITCase.prepare()");
@@ -118,6 +131,12 @@ public class KafkaITCase {
 			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
 			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+				SocketServer socketServer = brokers.get(i).socketServer();
+				String host = "localhost";
+				if(socketServer.host() != null) {
+					host = socketServer.host();
+				}
+				brokerConnectionStrings += host+":"+socketServer.port()+",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -125,6 +144,17 @@ public class KafkaITCase {
 			LOG.warn("Test failed with exception", t);
 			Assert.fail("Test failed with: " + t.getMessage());
 		}
+
+		Properties cProps = new Properties();
+		cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		cProps.setProperty("group.id", "flink-tests");
+		cProps.setProperty("auto.commit.enable", "false");
+
+		cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
+
+		standardCC = new ConsumerConfig(cProps);
+
+		zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
 	}
 
 	@AfterClass
@@ -142,8 +172,196 @@ public class KafkaITCase {
 				LOG.warn("ZK.stop() failed", e);
 			}
 		}
+		zkClient.close();
+	}
+
+
+	@Test
+	public void testOffsetManipulation() {
+		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+		final String topicName = "testOffsetManipulation";
+
+		// create topic
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topicName);
+		AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
+
+		Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+
+		zk.close();
+	}
+	/**
+	 * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
+	 *
+	 */
+	@Test
+	public void testPersistentSourceWithOffsetUpdates() throws Exception {
+		LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
+
+		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+		final String topicName = "testOffsetHacking";
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(50);
+		env.setNumberOfExecutionRetries(0);
+
+		// create topic
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topicName);
+		AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+		// write a sequence from 0 to 99 to each of the three partitions.
+		writeSequence(env, topicName, 0, 99);
+
+		readSequence(env, standardCC, topicName, 0, 100, 300);
+
+		// check offsets
+		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
+		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));
+
+
+		LOG.info("Manipulating offsets");
+		// set the offset to 25, 50, and 75 for the three partitions
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
+
+		// create new env
+		env = StreamExecutionEnvironment.createLocalEnvironment(3);
+		env.getConfig().disableSysoutLogging();
+		readSequence(env, standardCC, topicName, 50, 50, 150);
+
+		zk.close();
+
+		LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
+	}
+
+	private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
+		LOG.info("Reading sequence for verification until final count {}", finalCount);
+		DataStream<Tuple2<Integer, Integer>> source = env.addSource(
+				new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
+		)
+		//add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
+		// to play this trick. The problem is that we have to wait until all checkpoints are confirmed
+		.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+			@Override
+			public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+				Thread.sleep(75);
+				return value;
+			}
+		}).setParallelism(3);
+
+		// verify data
+		DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+			int[] values = new int[valuesCount];
+			int count = 0;
+
+			@Override
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+				values[value.f1 - valuesStartFrom]++;
+				count++;
+
+				LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
+				// verify if we've seen everything
+
+				if (count == finalCount) {
+					LOG.info("Received all values");
+					for (int i = 0; i < values.length; i++) {
+						int v = values[i];
+						if (v != 3) {
+							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+						}
+					}
+					// test has passed
+					throw new SuccessException();
+				}
+			}
+
+		}).setParallelism(1);
+
+		tryExecute(env, "Read data from Kafka");
+
+		LOG.info("Successfully read sequence for verification");
+	}
+
+
+
+	private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
+		LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
+		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+				LOG.info("Starting source.");
+				int cnt = from;
+				int partition = getRuntimeContext().getIndexOfThisSubtask();
+				while (running) {
+					LOG.info("Writing " + cnt + " to partition " + partition);
+					collector.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
+					if (cnt == to) {
+						LOG.info("Writer reached end.");
+						return;
+					}
+					cnt++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		}).setParallelism(3);
+		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
+				topicName,
+				new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
+				new T2Partitioner()
+		)).setParallelism(3);
+		env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
+		LOG.info("Finished writing sequence");
+	}
+
+	private static class T2Partitioner implements SerializableKafkaPartitioner {
+		@Override
+		public int partition(Object key, int numPartitions) {
+			if(numPartitions != 3) {
+				throw new IllegalArgumentException("Expected three partitions");
+			}
+			Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+			return element.f0;
+		}
+	}
+
+	public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+		try {
+			see.execute(name);
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				if(t == null) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
+				
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
+			}
+		}
 	}
 
+
 	@Test
 	public void regularKafkaSourceTest() throws Exception {
 		LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
@@ -152,10 +370,9 @@ public class KafkaITCase {
 		createTestTopic(topic, 1, 1);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new TupleSerializationSchema(), 5000));
+				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
 		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
 			int elCnt = 0;
 			int start = -1;
@@ -210,22 +427,9 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
 
-		try {
-			env.setParallelism(1);
-			env.execute();
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-			}
-		}
+		tryExecute(env, "regular kafka source test");
 
 		LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
 	}
@@ -241,7 +445,10 @@ public class KafkaITCase {
 
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+						new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+						standardCC
+				));
 		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
 			int elCnt = 0;
 			int start = -1;
@@ -304,22 +511,9 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
 
-		try {
-			env.setParallelism(1);
-			env.execute();
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-			}
-		}
+		tryExecute(env, "tupletesttopology");
 
 		LOG.info("Finished KafkaITCase.tupleTestTopology()");
 	}
@@ -347,16 +541,19 @@ public class KafkaITCase {
 		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
 		consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		consumerProps.setProperty("group.id", "test");
+		consumerProps.setProperty("auto.commit.enable", "false");
+		consumerProps.setProperty("auto.offset.reset", "smallest");
 
 		ConsumerConfig cc = new ConsumerConfig(consumerProps);
 		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, Offset.FROM_BEGINNING, cc));
+				new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
 
 		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
 			int elCnt = 0;
 
 			@Override
 			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+				LOG.info("Received {}", value.f0);
 				elCnt++;
 				if(value.f0 == -1) {
 					// we should have seen 11 elements now.
@@ -370,7 +567,7 @@ public class KafkaITCase {
 					throw new RuntimeException("More than 10 elements seen: "+elCnt);
 				}
 			}
-		});
+		}).setParallelism(1);
 
 		// add producing topology
 		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -398,6 +595,7 @@ public class KafkaITCase {
 					} catch (InterruptedException ignored) {
 					}
 					if(cnt == 10) {
+						LOG.info("Send end signal");
 						// signal end
 						collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
 						running = false;
@@ -412,31 +610,16 @@ public class KafkaITCase {
 			}
 		});
 
-		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(zookeeperConnectionString, topic,
+		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
 				new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
 		);
 
-		try {
-			env.setParallelism(1);
-			env.execute();
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-			}
-		}
+		tryExecute(env, "big topology test");
 
 		LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
 	}
 
 
-	private static boolean partitionerHasBeenCalled = false;
-
 	@Test
 	public void customPartitioningTestTopology() throws Exception {
 		LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
@@ -449,7 +632,9 @@ public class KafkaITCase {
 
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+						new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+						standardCC));
 		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
 			int start = -1;
 			BitSet validator = new BitSet(101);
@@ -519,23 +704,9 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), new CustomPartitioner()));
-
-		try {
-			env.setParallelism(1);
-			env.execute();
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				t = t.getCause();
-				if (limit++ == 20) {
-					throw good;
-				}
-			}
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
 
-			assertTrue(partitionerHasBeenCalled);
-		}
+		tryExecute(env, "custom partitioning test");
 
 		LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
 	}
@@ -547,7 +718,6 @@ public class KafkaITCase {
 
 		@Override
 		public int partition(Object key, int numPartitions) {
-			partitionerHasBeenCalled = true;
 
 			@SuppressWarnings("unchecked")
 			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
@@ -561,25 +731,6 @@ public class KafkaITCase {
 		}
 	}
 
-	private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, String>, byte[]> {
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Tuple2<Long, String> deserialize(byte[] message) {
-			Object deserializedObject = SerializationUtils.deserialize(message);
-			return (Tuple2<Long, String>) deserializedObject;
-		}
-
-		@Override
-		public byte[] serialize(Tuple2<Long, String> element) {
-			return SerializationUtils.serialize(element);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
-			return false;
-		}
-	}
 
 	@Test
 	public void simpleTestTopology() throws Exception {
@@ -591,7 +742,7 @@ public class KafkaITCase {
 
 		// add consuming topology:
 		DataStreamSource<String> consuming = env.addSource(
-				new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
+				new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
 		consuming.addSink(new SinkFunction<String>() {
 			int elCnt = 0;
 			int start = -1;
@@ -643,34 +794,34 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()));
+		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
 
-		try {
-			env.setParallelism(1);
-			env.execute();
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-			}
-		}
+		tryExecute(env, "simpletest");
 	}
 
 	private static boolean leaderHasShutDown = false;
 
-	@Test
+	@Test(timeout=60000)
 	public void brokerFailureTest() throws Exception {
 		String topic = "brokerFailureTestTopic";
 
 		createTestTopic(topic, 2, 2);
 
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-		final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+	//	KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+	//	final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+
+		PartitionMetadata firstPart = null;
+		do {
+			if(firstPart != null) {
+				LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+				// not the first try. Sleep a bit
+				Thread.sleep(150);
+			}
+			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+			firstPart = partitionMetadata.head();
+		} while(firstPart.errorCode() != 0);
+
+		final String leaderToShutDown = firstPart.leader().get().connectionString();
 
 		final Thread brokerShutdown = new Thread(new Runnable() {
 			@Override
@@ -704,7 +855,7 @@ public class KafkaITCase {
 
 		// add consuming topology:
 		DataStreamSource<String> consuming = env.addSource(
-				new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING));
+				new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
 		consuming.setParallelism(1);
 
 		consuming.addSink(new SinkFunction<String>() {
@@ -717,7 +868,7 @@ public class KafkaITCase {
 
 			@Override
 			public void invoke(String value) throws Exception {
-				LOG.info("Got message = " + value + " leader has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ numOfMessagesToBeCorrect);
+				LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
 				String[] sp = value.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -736,8 +887,8 @@ public class KafkaITCase {
 					shutdownKafkaBroker = true;
 				}
 
-				if(leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
-					if (elCnt >= stopAfterMessages ) {
+				if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+					if (elCnt >= stopAfterMessages) {
 						// check if everything in the bitset is set to true
 						int nc;
 						if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
@@ -779,29 +930,18 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()))
+		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
 				.setParallelism(1);
 
-		try {
-			env.setParallelism(1);
-			env.execute();
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-			}
-		}
+		tryExecute(env, "broker failure test");
 	}
 
 
 	private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-		kafkaTopicUtils.createTopic(topic, numberOfPartitions, replicationFactor);
+		// create topic
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topic);
+		AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
 	}
 
 	private static TestingServer getZookeeper() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
deleted file mode 100644
index 5f0e198..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.api.PartitionMetadata;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-public class KafkaTopicUtilsTest {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
-	private static final int NUMBER_OF_BROKERS = 2;
-	private static final String TOPIC = "myTopic";
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Test
-	public void test() {
-		int zkPort;
-		String kafkaHost;
-		String zookeeperConnectionString;
-
-		File tmpZkDir;
-		List<File> tmpKafkaDirs;
-		Map<String, KafkaServer> kafkaServers = null;
-		TestingServer zookeeper = null;
-
-		try {
-			tmpZkDir = tempFolder.newFolder();
-
-			tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS);
-			for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
-				tmpKafkaDirs.add(tempFolder.newFolder());
-			}
-
-			zkPort = NetUtils.getAvailablePort();
-			kafkaHost = InetAddress.getLocalHost().getHostName();
-			zookeeperConnectionString = "localhost:" + zkPort;
-
-			// init zookeeper
-			zookeeper = new TestingServer(zkPort, tmpZkDir);
-
-			// init kafka kafkaServers
-			kafkaServers = new HashMap<String, KafkaServer>();
-
-			for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
-				KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i));
-				kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(), kafkaServer);
-			}
-
-			// create Kafka topic
-			final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-			kafkaTopicUtils.createTopic(TOPIC, 1, 2);
-
-			// check whether topic exists
-			assertTrue(kafkaTopicUtils.topicExists(TOPIC));
-
-			// check number of partitions
-			assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC));
-
-			// get partition metadata without error
-			PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0);
-			assertEquals(0, partitionMetadata.errorCode());
-
-			// get broker list
-			assertEquals(new HashSet<String>(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC));
-		} catch (IOException e) {
-			fail(e.toString());
-		} catch (Exception e) {
-			fail(e.toString());
-		} finally {
-			LOG.info("Shutting down all services");
-			for (KafkaServer broker : kafkaServers.values()) {
-				if (broker != null) {
-					broker.shutdown();
-				}
-			}
-
-			if (zookeeper != null) {
-				try {
-					zookeeper.stop();
-				} catch (IOException e) {
-					LOG.warn("ZK.stop() failed", e);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	private static KafkaServer getKafkaServer(String kafkaHost, String zookeeperConnectionString, int brokerId, File tmpFolder) throws UnknownHostException {
-		Properties kafkaProperties = new Properties();
-
-		int kafkaPort = NetUtils.getAvailablePort();
-
-		// properties have to be Strings
-		kafkaProperties.put("advertised.host.name", kafkaHost);
-		kafkaProperties.put("port", Integer.toString(kafkaPort));
-		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-		server.startup();
-		return server;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
index dc20726..9ede613 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 196f7ec..4bd89c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
+import java.io.Serializable;
+
 /**
  * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
  * Similar to the {@link Checkpointed} interface, the function must produce a
@@ -32,4 +34,4 @@ package org.apache.flink.streaming.api.checkpoint;
  * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
  * of the actual state.</p>
  */
-public interface CheckpointedAsynchronously extends Checkpointed {}
+public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index a59407c..3efad93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -30,8 +30,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute() throws Exception {
-		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(),
-																getConfig().isSysoutLoggingEnabled());
+		return execute(DEFAULT_JOB_NAME);
 	}
 
 	/**
@@ -44,7 +43,9 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
-																		getConfig().isSysoutLoggingEnabled());
+		JobExecutionResult result = ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
+				getConfig().isSysoutLoggingEnabled());
+		streamGraph.clear(); // clear graph to allow submitting another job via the same environment.
+		return result;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b0471f9..2e33b82 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
-import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
@@ -67,6 +67,8 @@ import com.esotericsoftware.kryo.Serializer;
  */
 public abstract class StreamExecutionEnvironment {
 
+	public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
+
 	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
 
 	private long bufferTimeout = 100;
@@ -624,8 +626,8 @@ public abstract class StreamExecutionEnvironment {
 
 		TypeInformation<OUT> outTypeInfo;
 
-		if (function instanceof GenericSourceFunction) {
-			outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
+		if (function instanceof ResultTypeQueryable) {
+			outTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
 		} else {
 			try {
 				outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
deleted file mode 100644
index 0113cfe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public interface GenericSourceFunction<T> {
-
-	TypeInformation<T> getType();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 93bf8eb..271c05c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -60,8 +60,7 @@ import org.slf4j.LoggerFactory;
 public class StreamGraph extends StreamingPlan {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
-	private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
-	private String jobName = DEAFULT_JOB_NAME;
+	private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
 
 	private final StreamExecutionEnvironment environemnt;
 	private final ExecutionConfig executionConfig;
@@ -70,17 +69,25 @@ public class StreamGraph extends StreamingPlan {
 	private long checkpointingInterval = 5000;
 	private boolean chaining = true;
 
-	private final Map<Integer, StreamNode> streamNodes;
-	private final Set<Integer> sources;
+	private Map<Integer, StreamNode> streamNodes;
+	private Set<Integer> sources;
 
-	private final Map<Integer, StreamLoop> streamLoops;
-	protected final Map<Integer, StreamLoop> vertexIDtoLoop;
+	private Map<Integer, StreamLoop> streamLoops;
+	protected Map<Integer, StreamLoop> vertexIDtoLoop;
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
 
 		this.environemnt = environment;
 		executionConfig = environment.getConfig();
 
+		// create an empty new stream graph.
+		clear();
+	}
+
+	/**
+	 * Remove all registered nodes etc.
+	 */
+	public void clear() {
 		streamNodes = new HashMap<Integer, StreamNode>();
 		streamLoops = new HashMap<Integer, StreamLoop>();
 		vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f66e394..24a08eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,8 +137,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
 			callUserFunction();
 		} catch (Exception e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
+				LOG.error("Calling user function failed", e);
 			}
 			throw new RuntimeException(e);
 		}
@@ -168,7 +166,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
 		try {
 			FunctionUtils.closeFunction(userFunction);
 		} catch (Exception e) {
-			throw new RuntimeException("Error when closing the function: " + e.getMessage());
+			throw new RuntimeException("Error when closing the function", e);
 		}
 	}
 
@@ -187,8 +185,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
 	public void setChainingStrategy(ChainingStrategy strategy) {
 		if (strategy == ChainingStrategy.ALWAYS) {
 			if (!(this instanceof ChainableStreamOperator)) {
-				throw new RuntimeException(
-						"Operator needs to extend ChainableOperator to be chained");
+				throw new RuntimeException("Operator needs to extend ChainableOperator to be chained");
 			}
 		}
 		this.chainingStrategy = strategy;

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
deleted file mode 100644
index 4dd4b45..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionableState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Base class for representing operator states that can be repartitioned for
- * state state and load balancing.
- * 
- * @param <T>
- *            The type of the operator state.
- */
-public abstract class PartitionableState<T> extends OperatorState<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	PartitionableState(T initialState) {
-		super(initialState);
-	}
-
-	/**
-	 * Repartitions(divides) the current state into the given number of new
-	 * partitions. The created partitions will be used to redistribute then
-	 * rebuild the state among the parallel instances of the operator. The
-	 * implementation should reflect the partitioning of the input values to
-	 * maintain correct operator behavior.
-	 * 
-	 * </br> </br> It is also assumed that if we would {@link #reBuild} the
-	 * repartitioned state we would basically get the same as before.
-	 * 
-	 * 
-	 * @param numberOfPartitions
-	 *            The desired number of partitions. The method must return an
-	 *            array of that size.
-	 * @return The array containing the state part for each partition.
-	 */
-	public abstract OperatorState<T>[] repartition(int numberOfPartitions);
-
-	/**
-	 * Rebuilds the current state partition from the given parts. Used for
-	 * building the state after a re-balance phase.
-	 * 
-	 * @param parts
-	 *            The state parts that will be used to rebuild the current
-	 *            partition.
-	 * @return The rebuilt operator state.
-	 */
-	public abstract OperatorState<T> reBuild(OperatorState<T>... parts);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 930c9b4..0259568 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -273,7 +273,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
 		synchronized (checkpointLock) {
 			if (isRunning) {
 				try {
-					LOG.info("Starting checkpoint " + checkpointId);
+					LOG.info("Starting checkpoint {} on task {}", checkpointId, getName());
 					
 					// first draw the state that should go into checkpoint
 					LocalStateHandle state;
@@ -282,7 +282,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
 						state = userState == null ? null : new LocalStateHandle(userState);
 					}
 					catch (Exception e) {
-						throw new Exception("Error while drawing snapshot of the user state.");
+						throw new Exception("Error while drawing snapshot of the user state.", e);
 					}
 			
 					// now emit the checkpoint barriers
@@ -333,8 +333,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
 				triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
 			}
 			catch (Exception e) {
-				throw new RuntimeException(
-						"Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+				throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index faaa79b..87c9757 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
 import java.io.Serializable;
 
-public interface DeserializationSchema<T> extends Serializable {
+public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
 
 	/**
 	 * Deserializes the incoming data.

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
index 93d13ab..a4b1419 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
 
@@ -38,4 +40,9 @@ public class JavaDefaultStringSchema implements DeserializationSchema<String>, S
 		return SerializationUtils.deserialize(message);
 	}
 
+	@Override
+	public TypeInformation<String> getProducedType() {
+		return TypeExtractor.getForClass(String.class);
+	}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
index e457bef..9c5885f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 public class RawSchema implements DeserializationSchema<byte[]>,
 		SerializationSchema<byte[], byte[]> {
 
@@ -36,4 +39,9 @@ public class RawSchema implements DeserializationSchema<byte[]>,
 	public byte[] serialize(byte[] element) {
 		return element;
 	}
+
+	@Override
+	public TypeInformation<byte[]> getProducedType() {
+		return TypeExtractor.getForClass(byte[].class);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 3d0a0d5..7c5946d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 public class SimpleStringSchema implements DeserializationSchema<String>,
 		SerializationSchema<String, String> {
 
@@ -37,4 +40,8 @@ public class SimpleStringSchema implements DeserializationSchema<String>,
 		return element;
 	}
 
+	@Override
+	public TypeInformation<String> getProducedType() {
+		return TypeExtractor.getForClass(String.class);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
deleted file mode 100644
index 136a091..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/OperatorStateTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class OperatorStateTest {
-
-	@Test
-	public void testOperatorState() {
-		OperatorState<Integer> os = new OperatorState<Integer>(5);
-
-		StateCheckpoint<Integer> scp = os.checkpoint();
-
-		assertTrue(os.stateEquals(scp.restore()));
-
-		assertEquals((Integer) 5, os.getState());
-
-		os.update(10);
-
-		assertEquals((Integer) 10, os.getState());
-	}
-
-}