You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rmetzger <gi...@git.apache.org> on 2015/12/04 17:35:38 UTC

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/1437

    [FLINK-3102] Allow reading from multiple topics with one FlinkKafkaConsumer

    * I added a new constructor to the Kafka source, accepting a List<String> of topics.
    * the deserialization schema will get the topic as a String.
    
    I'm opening the PR for review now, the tests pass locally, I'll verify the changes on a real cluster as well.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink3102

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1437.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1437
    
----
commit 58236653438ff003ba6026403098e0e599b5cce5
Author: Robert Metzger <rm...@apache.org>
Date:   2015-12-03T15:32:55Z

    [FLINK-3102] Allow reading from multiple topics with one FlinkKafkaConsumer instance

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162525731
  
    Aside from minor comments and the style issue it looks good. I'm not too familiar with the internals of Kafka (which we are touching here, a bit), though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46814334
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -516,10 +510,12 @@ public void close() throws Exception {
     
     		if (LOG.isDebugEnabled()) {
     			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
    -					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
    +					KafkaTopicPartition.toString(lastOffsets), checkpointId, checkpointTimestamp);
     		}
     
    -		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
    +		// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
    +		//noinspection unchecked
    +		HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) lastOffsets.clone();
    --- End diff --
    
    Why don't you use `new HashMap<>(lastOffsets)` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46813501
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---
    @@ -0,0 +1,117 @@
    +/*
    --- End diff --
    
    The file has inconsistent control flow style, some of the `if` have a space before opening parenthesis the `for` loops don't have a space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46841698
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A serializable representation of a kafka topic and a partition.
    + * Used as an operator state for the Kafka consumer
    + */
    +public class KafkaTopicPartition implements Serializable {
    +
    +	private static final long serialVersionUID = 722083576322742325L;
    +
    +	private String topic;
    +	private int partition;
    +
    +	public KafkaTopicPartition(String topic, int partition) {
    +		this.topic = topic;
    +		this.partition = partition;
    +	}
    +
    +	public String getTopic() {
    +		return topic;
    +	}
    +
    +	public int getPartition() {
    +		return partition;
    +	}
    +
    +	public void setTopic(String topic) {
    +		this.topic = topic;
    +	}
    +
    +	public void setPartition(int partition) {
    +		this.partition = partition;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "KafkaTopicPartition{" +
    +				"topic='" + topic + '\'' +
    +				", partition=" + partition +
    +				'}';
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || !(o instanceof KafkaTopicPartition)) {
    +			return false;
    +		}
    +
    +		KafkaTopicPartition that = (KafkaTopicPartition) o;
    +
    +		if (partition != that.partition) {
    +			return false;
    +		}
    +		return !(topic != null ? !topic.equals(that.topic) : that.topic != null);
    --- End diff --
    
    Make topic a "non null" fields (check in the constructor) and save a few cycles here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46836686
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -25,6 +25,7 @@
     import kafka.javaapi.consumer.SimpleConsumer;
     import org.apache.commons.collections.map.LinkedMap;
     
    +import org.apache.commons.lang.StringUtils;
    --- End diff --
    
    I will exclude all lang imports using a checkstyle rule.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46838706
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A serializable representation of a kafka topic and a partition.
    + * Used as an operator state for the Kafka consumer
    + */
    +public class KafkaTopicPartition implements Serializable {
    --- End diff --
    
    Can this be an immutable type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46840520
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -640,30 +642,38 @@ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer)
     		@Override
     		public void run() {
     			try {
    +
     				while (running) {
     					try {
     						Thread.sleep(commitInterval);
     						//  ------------  commit current offsets ----------------
     
     						// create copy of current offsets
    -						long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    -
    -						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    -							int partition = tp.partition();
    -							long offset = currentOffsets[partition];
    -							long lastCommitted = consumer.commitedOffsets[partition];
    -
    +						HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone();
    +
    +						Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						for (KafkaTopicPartition tp : (List<KafkaTopicPartition>)consumer.subscribedPartitions) {
    --- End diff --
    
    Fixed. I forgot to set the type parameter for the `consumer` reference, that's why I did excessive casing here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162505050
  
    The style of control-flow statements is not consistent, some have space before opening parenthesis and some don't.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162459684
  
    Did you only change `KeyedDeserializationSchema`, not `SerializationSchema`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46813524
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java ---
    @@ -0,0 +1,76 @@
    +/*
    --- End diff --
    
    Inconsistent control flow parenthesis style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837638
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -563,37 +559,44 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
     					return;
     				}
    -	
    -				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
    +
    +				//noinspection unchecked
    +				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
    +
     				
     				// remove older checkpoints in map
     				for (int i = 0; i < posInMap; i++) {
     					pendingCheckpoints.remove(0);
     				}
     			}
    -	
    -			if (LOG.isInfoEnabled()) {
    -				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
    +			if(checkpointOffsets.size() == 0) {
    +				LOG.info("Checkpoint state was empty.");
    +				return;
     			}
     	
     			// build the map of (topic,partition) -> committed offset
    -			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -			for (TopicPartition tp : subscribedPartitions) {
    -				
    -				int partition = tp.partition();
    -				long offset = checkpointOffsets[partition];
    -				long lastCommitted = commitedOffsets[partition];
    -				
    +			Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
    +			for (KafkaTopicPartition tp : subscribedPartitions) {
    +				Long offset = checkpointOffsets.get(tp);
    +				Long lastCommitted = commitedOffsets.get(tp);
    +				if(lastCommitted == null) {
    +					lastCommitted = OFFSET_NOT_SET;
    +				}
     				if (offset != OFFSET_NOT_SET) {
     					if (offset > lastCommitted) {
     						offsetsToCommit.put(tp, offset);
    -						LOG.debug("Committing offset {} for partition {}", offset, partition);
    +						//noinspection unchecked
    +						commitedOffsets.put(tp, offset);
    +						LOG.debug("Committing offset {} for {}", offset, tp.toString());
     					}
     					else {
    -						LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +						LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp);
     					}
     				}
     			}
    +			if(LOG.isInfoEnabled() && offsetsToCommit.size() > 0) {
    --- End diff --
    
    Is this a good thing on INFO level? Will be a lot of contents in the logs, especially when running frequent checkpoints. I would vote to log this on DEBUG...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46835856
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -516,10 +510,12 @@ public void close() throws Exception {
     
     		if (LOG.isDebugEnabled()) {
     			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
    -					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
    +					KafkaTopicPartition.toString(lastOffsets), checkpointId, checkpointTimestamp);
     		}
     
    -		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
    +		// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
    +		//noinspection unchecked
    +		HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) lastOffsets.clone();
    --- End diff --
    
    Clone is probably a bit more efficient even, as does not hash but only copy the table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837807
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -640,30 +642,38 @@ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer)
     		@Override
     		public void run() {
     			try {
    +
     				while (running) {
     					try {
     						Thread.sleep(commitInterval);
     						//  ------------  commit current offsets ----------------
     
     						// create copy of current offsets
    -						long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    -
    -						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    -							int partition = tp.partition();
    -							long offset = currentOffsets[partition];
    -							long lastCommitted = consumer.commitedOffsets[partition];
    -
    +						HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone();
    --- End diff --
    
    Code duplication, can go into a util method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46832880
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -25,6 +25,7 @@
     import kafka.javaapi.consumer.SimpleConsumer;
     import org.apache.commons.collections.map.LinkedMap;
     
    +import org.apache.commons.lang.StringUtils;
    --- End diff --
    
    Can we consistently use the classes from "lang3" instread of "lang" v2?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46836934
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -563,37 +559,44 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
     					return;
     				}
    -	
    -				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
    +
    +				//noinspection unchecked
    +				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
    +
     				
     				// remove older checkpoints in map
     				for (int i = 0; i < posInMap; i++) {
     					pendingCheckpoints.remove(0);
     				}
     			}
    -	
    -			if (LOG.isInfoEnabled()) {
    -				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
    +			if(checkpointOffsets.size() == 0) {
    --- End diff --
    
    I think this is a bug. If the list is empty after you remove the checkpoint to commit, it only means that no pending checkpoint was subsumed. It should still commit the offsets to Kafka.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46840459
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import org.apache.kafka.common.Node;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +/**
    + * Serializable Topic Partition info with leader Node information.
    + * This class is used at runtime.
    + *
    + * The leader information does not account for equality!
    + */
    +public class KafkaTopicPartitionLeader extends KafkaTopicPartition implements Serializable {
    --- End diff --
    
    This should be an immutable type as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46841544
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A serializable representation of a kafka topic and a partition.
    + * Used as an operator state for the Kafka consumer
    + */
    +public class KafkaTopicPartition implements Serializable {
    +
    +	private static final long serialVersionUID = 722083576322742325L;
    +
    +	private String topic;
    +	private int partition;
    +
    +	public KafkaTopicPartition(String topic, int partition) {
    +		this.topic = topic;
    +		this.partition = partition;
    +	}
    +
    +	public String getTopic() {
    +		return topic;
    +	}
    +
    +	public int getPartition() {
    +		return partition;
    +	}
    +
    +	public void setTopic(String topic) {
    +		this.topic = topic;
    +	}
    +
    +	public void setPartition(int partition) {
    +		this.partition = partition;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "KafkaTopicPartition{" +
    +				"topic='" + topic + '\'' +
    +				", partition=" + partition +
    +				'}';
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || !(o instanceof KafkaTopicPartition)) {
    --- End diff --
    
    Redundant, "instanceof" checks for null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162580468
  
    All in all quite good. Few things:
    
      - The code style is super heterogeneous, sometimes it puts spaces after a keyword, sometimes it does not. Would be good to make this straight.
    
      - This mixes "Arrays.toString" and "StringUtils.join". Why not avail any dependency on external libraries (with all the dependency conflicts, it is nice to avoid where possible) and use
         - `Arrays.toString()` for arrays.
         - Simply `toString()` for lists.
    
      - `KafkaTopicPartition` and `KafkaTopicPartitionLeader` need to be fast for hash code and equals. They are used as hash map keys for every record (offset update). Due to that, they should really be immutable types and cache their hash code, since that is used for any record emitted (update in the offsets hash map). Also, equals() should be fast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46835390
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -303,25 +300,29 @@ public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializ
     			validateZooKeeperConfig(props);
     		}
     		
    -		// Connect to a broker to get the partitions
    -		List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
    +		// Connect to a broker to get the partitions for all topics
    +		this.partitionInfos = getPartitionsForTopic(topics, props);
     
     		if (partitionInfos.size() == 0) {
    -			throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." +
    +			throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + StringUtils.join(topics, ",") + "." +
     					"Please check previous log entries");
     		}
     
    -		// get initial partitions list. The order of the partitions is important for consistent 
    -		// partition id assignment in restart cases.
    -		this.partitions = new int[partitionInfos.size()];
    -		for (int i = 0; i < partitionInfos.size(); i++) {
    -			partitions[i] = partitionInfos.get(i).partition();
    -			
    -			if (partitions[i] >= partitions.length) {
    -				throw new RuntimeException("Kafka partition numbers are sparse");
    +		Map<String, Integer> countPerTopic = new HashMap<>();
    --- End diff --
    
    How about guarding this section with `if (LOG.isInfoEnabled())`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46839322
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -640,30 +642,38 @@ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer)
     		@Override
     		public void run() {
     			try {
    +
     				while (running) {
     					try {
     						Thread.sleep(commitInterval);
     						//  ------------  commit current offsets ----------------
     
     						// create copy of current offsets
    -						long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    -
    -						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    -							int partition = tp.partition();
    -							long offset = currentOffsets[partition];
    -							long lastCommitted = consumer.commitedOffsets[partition];
    -
    +						HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone();
    +
    +						Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						for (KafkaTopicPartition tp : (List<KafkaTopicPartition>)consumer.subscribedPartitions) {
    --- End diff --
    
    A bit unclean to cast here from `List<KafkaTopicPartitionLeader>` to `List<KafkaTopicPartition>`. Why not simply iterate with the `KafkaTopicPartitionLeader` type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162640162
  
    Looks pretty good.
    
    +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837302
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -222,18 +219,18 @@
     	/** The committer that persists the committed offsets */
     	private transient OffsetHandler offsetHandler;
     	
    -	/** The partitions actually handled by this consumer */
    -	private transient List<TopicPartition> subscribedPartitions;
    +	/** The partitions actually handled by this consumer at runtime */
    +	private transient List<KafkaTopicPartitionLeader> subscribedPartitions;
     
     	/** The offsets of the last returned elements */
    -	private transient long[] lastOffsets;
    +	private transient HashMap<KafkaTopicPartition, Long> lastOffsets;
     
     	/** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
     	 * newer then the last offsets (Flink's internal view is fresher) */
    -	private transient long[] commitedOffsets;
    +	private transient HashMap<KafkaTopicPartition, Long> commitedOffsets;
    --- End diff --
    
    Type in the variable name: "committed"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162634935
  
    Thank you for the good reviews!
    I've addressed all comments.
    The entire kafka connector is now using a space after `if` and `for`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837841
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -640,30 +642,38 @@ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer)
     		@Override
     		public void run() {
     			try {
    +
     				while (running) {
     					try {
     						Thread.sleep(commitInterval);
     						//  ------------  commit current offsets ----------------
     
     						// create copy of current offsets
    -						long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    -
    -						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    -							int partition = tp.partition();
    -							long offset = currentOffsets[partition];
    -							long lastCommitted = consumer.commitedOffsets[partition];
    -
    +						HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone();
    +
    +						Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						for (KafkaTopicPartition tp : (List<KafkaTopicPartition>)consumer.subscribedPartitions) {
    +							long offset = currentOffsets.get(tp);
    +							Long lastCommitted = (Long)consumer.commitedOffsets.get(tp);
    +							if(lastCommitted == null) {
    +								lastCommitted = OFFSET_NOT_SET;
    +							}
     							if (offset != OFFSET_NOT_SET) {
     								if (offset > lastCommitted) {
     									offsetsToCommit.put(tp, offset);
    -									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +									//noinspection unchecked
    +									consumer.commitedOffsets.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, tp);
     								} else {
    -									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp);
     								}
     							}
     						}
     
    +						if(LOG.isInfoEnabled() && offsetsToCommit.size() > 0) {
    --- End diff --
    
    Same as above: Info logging may be pretty aggressive...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837508
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -563,37 +559,44 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
     					return;
     				}
    -	
    -				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
    +
    +				//noinspection unchecked
    +				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
    +
     				
     				// remove older checkpoints in map
     				for (int i = 0; i < posInMap; i++) {
     					pendingCheckpoints.remove(0);
     				}
     			}
    -	
    -			if (LOG.isInfoEnabled()) {
    -				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
    +			if(checkpointOffsets.size() == 0) {
    +				LOG.info("Checkpoint state was empty.");
    +				return;
     			}
     	
     			// build the map of (topic,partition) -> committed offset
    -			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -			for (TopicPartition tp : subscribedPartitions) {
    -				
    -				int partition = tp.partition();
    -				long offset = checkpointOffsets[partition];
    -				long lastCommitted = commitedOffsets[partition];
    -				
    +			Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
    +			for (KafkaTopicPartition tp : subscribedPartitions) {
    +				Long offset = checkpointOffsets.get(tp);
    +				Long lastCommitted = commitedOffsets.get(tp);
    +				if(lastCommitted == null) {
    +					lastCommitted = OFFSET_NOT_SET;
    +				}
     				if (offset != OFFSET_NOT_SET) {
     					if (offset > lastCommitted) {
     						offsetsToCommit.put(tp, offset);
    -						LOG.debug("Committing offset {} for partition {}", offset, partition);
    +						//noinspection unchecked
    --- End diff --
    
    Inspection suppression should not be needed...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1437#issuecomment-162465233
  
    The `KeyedDeserializationSchema` is a new interface introduced in 1.0. I don't want to break the existing `DeserializationSchema` with the change.
    Even before the change, the `KeyedDeserializationSchema` was exposing more than the DeserialziationSchema (the offset), it can be seen as the Rich* variant of the DeserSchema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837656
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java ---
    @@ -61,15 +53,15 @@
     	 *     }
     	 * }
     	 * }</pre>
    -	 * 
    +	 *
    +	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
     	 * @param sourceContext The source context to emit elements to.
     	 * @param valueDeserializer The deserializer to decode the raw values with.
    -	 * @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
    -	 * 
    -	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
    +	 * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state)
    +*
    --- End diff --
    
    Fixed the `*`.
    The java standard library is putting generic parameters first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46804012
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java ---
    @@ -61,15 +53,15 @@
     	 *     }
     	 * }
     	 * }</pre>
    -	 * 
    +	 *
    +	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
     	 * @param sourceContext The source context to emit elements to.
     	 * @param valueDeserializer The deserializer to decode the raw values with.
    -	 * @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
    -	 * 
    -	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
    +	 * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state)
    +*
    --- End diff --
    
    Strange formatting of `*`. Did you check whether the generic param should be before normal params or why did you move?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46836078
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -372,34 +372,28 @@ public void open(Configuration parameters) throws Exception {
     				throw new RuntimeException("Requested unknown offset store " + offsetStore);
     		}
     		
    -		// set up operator state
    -		lastOffsets = new long[partitions.length];
    -		commitedOffsets = new long[partitions.length];
    -		
    -		Arrays.fill(lastOffsets, OFFSET_NOT_SET);
    -		Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
    -		
    +		commitedOffsets = new HashMap<>();
    +
     		// seek to last known pos, from restore request
     		if (restoreToOffset != null) {
     			if (LOG.isInfoEnabled()) {
    -				LOG.info("Consumer {} found offsets from previous checkpoint: {}",
    -						thisComsumerIndex,  Arrays.toString(restoreToOffset));
    +				LOG.info("Consumer {} is restored from previous checkpoint: {}",
    +						thisComsumerIndex, KafkaTopicPartition.toString(restoreToOffset));
     			}
     			
    -			for (int i = 0; i < restoreToOffset.length; i++) {
    -				long restoredOffset = restoreToOffset[i];
    -				if (restoredOffset != OFFSET_NOT_SET) {
    -					// if this fails because we are not subscribed to the topic, then the
    -					// partition assignment is not deterministic!
    -					
    -					// we set the offset +1 here, because seek() is accepting the next offset to read,
    -					// but the restore offset is the last read offset
    -					fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
    -					lastOffsets[i] = restoredOffset;
    -				}
    +			for(Map.Entry<KafkaTopicPartition, Long> restorePartition: restoreToOffset.entrySet()) {
    +				// seek fetcher to restore position
    +				// we set the offset +1 here, because seek() is accepting the next offset to read,
    +				// but the restore offset is the last read offset
    +				fetcher.seek(restorePartition.getKey(), restorePartition.getValue() + 1);
     			}
    +			// initialize offsets with restored state
    +			this.lastOffsets = restoreToOffset;
    --- End diff --
    
    For GC friendlyness, how about setting `restoreToOffset = null;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46841758
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import org.apache.kafka.common.Node;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +/**
    + * Serializable Topic Partition info with leader Node information.
    + * This class is used at runtime.
    + *
    + * The leader information does not account for equality!
    + */
    +public class KafkaTopicPartitionLeader extends KafkaTopicPartition implements Serializable {
    +
    +	private static final long serialVersionUID = 9145855900303748582L;
    +
    +	private final int leaderId;
    +	private int leaderPort;
    +	private String leaderHost;
    +
    +	public KafkaTopicPartitionLeader(String topic, int partition, Node leader) {
    +		super(topic, partition);
    +		if(leader == null) {
    +			this.leaderId = -1;
    +		} else {
    +			this.leaderId = leader.id();
    +			this.leaderPort = leader.port();
    +			this.leaderHost = leader.host();
    +		}
    +	}
    +
    +
    +	public Node getLeader() {
    +		if(this.leaderId == -1) {
    +			return null;
    +		} else {
    +			return new Node(leaderId, leaderHost, leaderPort);
    +		}
    +	}
    +
    +	public static Object toString(List<KafkaTopicPartitionLeader> partitions) {
    +		StringBuilder sb = new StringBuilder();
    +		for(KafkaTopicPartitionLeader p: partitions) {
    +			sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
    +		}
    +		return sb.toString();
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		return super.equals(o);
    --- End diff --
    
    This is ignoring the leader IDs. Seems wrong...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1437


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3102] Allow reading from multiple topic...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46840339
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A serializable representation of a kafka topic and a partition.
    + * Used as an operator state for the Kafka consumer
    + */
    +public class KafkaTopicPartition implements Serializable {
    --- End diff --
    
    Good catch!
    I'll change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---