You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/09 13:42:29 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #6160: Flink: Support locality with LocalitySplitAssigner

hililiwei opened a new pull request, #6160:
URL: https://github.com/apache/iceberg/pull/6160

   Create locality assigner that hands out splits with guarantee in locality.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111390540


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -236,8 +236,6 @@ public FlinkInputFormat buildFormat() {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
 
-      contextBuilder.exposeLocality(

Review Comment:
   This should be done by `resolveConfig`.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1128203275


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();

Review Comment:
   got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1110438354


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {

Review Comment:
   I am not sure this is the right approach. It is `O(N)` for fetching a split, where `N` is the number of unique sets (of hosts). Let's trying to estimate N with a simple case where each split only contains one file. Assume HDFS cluster has `m` hosts with replication as `k`. `N` (number of unique sets) could be `C(m, k)`.
   
   cc @pvary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111418712


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();

Review Comment:
   The code above it, if there are splits that haven't been allocated yet, it returns any of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1128152554


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));

Review Comment:
   yes, I meant shallow copy. the Split object is not cloned. I was saying we can avoid the shallow copy too. but I guess it is not important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1128155065


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {

Review Comment:
   I think this needs to be synchronized. so do some other methods, as read-write on the pendingSplits should be protected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1152870720


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME)
+            : getIcebergSourceSplits(hostname);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    return split != null ? GetSplitResult.forSplit(split) : GetSplitResult.unavailable();
+  }
+
+  private IcebergSourceSplit getIcebergSourceSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = getSplits(hostname);
+
+    if (icebergSourceSplits != null) {
+      Optional<IcebergSourceSplit> first = icebergSourceSplits.stream().findFirst();
+      if (first.isPresent()) {
+        pendingSplits.values().forEach(splitSet -> splitSet.remove(first.get()));
+        return first.get();
+      }
+    }
+
+    return null;
+  }
+
+  private Set<IcebergSourceSplit> getSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = pendingSplits.get(hostname);
+    if (icebergSourceSplits != null) {
+      if (!icebergSourceSplits.isEmpty()) {
+        return icebergSourceSplits;
+      }
+    }
+
+    pendingSplits.remove(hostname);

Review Comment:
   Try to get the split collection with the key of the hostname from the Map. If it is not empty, return the collection. Otherwise, remove the hostname from the Map(its value may be null or empty), and then find a non-empty split collection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1110438354


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {

Review Comment:
   I am not sure this is the right approach. It is `O(N)` for fetching a split, where `N` is the number of unique sets (of hosts). Let's trying to estimate N with a simple case where each split only contains one file. Assume HDFS cluster has `m` hosts with replication as `k`. `N` (number of unique sets) could be `C(m, k)`. E.g. `C(100, 3)` yields `161,700`.
   
   cc @pvary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111387968


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java:
##########
@@ -94,14 +94,17 @@ private FlinkConfigOptions() {}
   public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
       ConfigOptions.key("table.exec.iceberg.split-assigner-type")
           .enumType(SplitAssignerType.class)
-          .defaultValue(SplitAssignerType.SIMPLE)
+          .noDefaultValue()

Review Comment:
   We cannot use the default value. When the user does not configure it, we need it to be null, so that we can further determine whether locality inference is supported by checking the I/O type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111418712


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();

Review Comment:
   The code above it, if there are slices that haven't been allocated yet, it returns any of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#issuecomment-1541475364

   cc @stevenzwu @pvary   could you please take a look when you have a chance, thx?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1143722125


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME)
+            : getIcebergSourceSplits(hostname);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    return split != null ? GetSplitResult.forSplit(split) : GetSplitResult.unavailable();
+  }
+
+  private IcebergSourceSplit getIcebergSourceSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = getSplits(hostname);
+
+    if (icebergSourceSplits != null) {
+      Optional<IcebergSourceSplit> first = icebergSourceSplits.stream().findFirst();
+      if (first.isPresent()) {
+        pendingSplits.values().forEach(splitSet -> splitSet.remove(first.get()));
+        return first.get();
+      }
+    }
+
+    return null;
+  }
+
+  private Set<IcebergSourceSplit> getSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = pendingSplits.get(hostname);
+    if (icebergSourceSplits != null) {
+      if (!icebergSourceSplits.isEmpty()) {
+        return icebergSourceSplits;
+      }
+    }
+
+    pendingSplits.remove(hostname);

Review Comment:
   is this correct to remove the whole entry for the hostname. Can you explain the algorithm in this method? it is not easy to understand the logic here.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java:
##########
@@ -43,6 +42,20 @@ static boolean isLocalityEnabled(
     return Util.mayHaveBlockLocations(table.io(), table.location());
   }
 
+  static SplitAssignerFactory createAssignerFactory(
+      ReadableConfig readableConfig, Boolean exposeLocality) {
+    SplitAssignerType assignerType =
+        readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
+    if (assignerType != null) {
+      return assignerType.factory();
+    }
+
+    if (exposeLocality) {
+      return SplitAssignerType.LOCALITY.factory();
+    }
+    return SplitAssignerType.SIMPLE.factory();

Review Comment:
   nit: empty line



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null

Review Comment:
   wondering if hostname can be null for locality aware assigner. should we add a precondition check?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME)
+            : getIcebergSourceSplits(hostname);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    return split != null ? GetSplitResult.forSplit(split) : GetSplitResult.unavailable();
+  }
+
+  private IcebergSourceSplit getIcebergSourceSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = getSplits(hostname);
+
+    if (icebergSourceSplits != null) {
+      Optional<IcebergSourceSplit> first = icebergSourceSplits.stream().findFirst();
+      if (first.isPresent()) {
+        pendingSplits.values().forEach(splitSet -> splitSet.remove(first.get()));
+        return first.get();
+      }
+    }
+
+    return null;
+  }
+
+  private Set<IcebergSourceSplit> getSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = pendingSplits.get(hostname);
+    if (icebergSourceSplits != null) {
+      if (!icebergSourceSplits.isEmpty()) {
+        return icebergSourceSplits;
+      }
+    }
+
+    pendingSplits.remove(hostname);
+
+    return pendingSplits.values().stream()
+        .filter(splitSet -> splitSet != null && splitSet.size() > 0)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @Override
+  public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  @Override
+  public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  private synchronized void addSplits(Collection<IcebergSourceSplit> splits) {
+    if (splits.isEmpty()) {
+      return;
+    }
+
+    for (IcebergSourceSplit split : splits) {
+      String[] hostnames = split.hostnames();
+      if (hostnames == null) {
+        hostnames = new String[] {DEFAULT_HOSTNAME};
+      }
+
+      for (String hostname : hostnames) {
+        pendingSplits.compute(
+            hostname,
+            (key, value) -> {
+              if (value == null) {
+                return Sets.newHashSet(split);
+              } else {
+                value.add(split);
+                return value;
+              }
+            });
+      }
+    }
+
+    // only complete pending future if new splits are discovered
+    completeAvailableFuturesIfNeeded();
+  }
+
+  @Override
+  public synchronized Collection<IcebergSourceSplitState> state() {

Review Comment:
   is this correct? the same split can be added multiple times to the map. one for each hostname, right?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java:
##########
@@ -89,6 +106,7 @@ public String toString() {
         .add("files", toString(task.files()))
         .add("fileOffset", fileOffset)
         .add("recordOffset", recordOffset)
+        .add("hostname", hostnames)

Review Comment:
   should be `hostnames`



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java:
##########
@@ -57,6 +57,12 @@ private SplitHelpers() {}
    */
   public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
       TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception {
+    return createSplitsFromTransientHadoopTable(temporaryFolder, fileCount, filesPerSplit, null);
+  }
+
+  public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+      TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit, String[] hostname)

Review Comment:
   should be `hostnames`. please check all the references (singular -> plural).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1093366709


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java:
##########
@@ -94,14 +94,17 @@ private FlinkConfigOptions() {}
   public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
       ConfigOptions.key("table.exec.iceberg.split-assigner-type")
           .enumType(SplitAssignerType.class)
-          .defaultValue(SplitAssignerType.SIMPLE)
+          .noDefaultValue()

Review Comment:
   why do we change the default? I thought it is good to set a default for the more common scenario which simple assigner is probably good.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java:
##########
@@ -94,14 +94,17 @@ private FlinkConfigOptions() {}
   public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
       ConfigOptions.key("table.exec.iceberg.split-assigner-type")
           .enumType(SplitAssignerType.class)
-          .defaultValue(SplitAssignerType.SIMPLE)
+          .noDefaultValue()
           .withDescription(
               Description.builder()
                   .text("Split assigner type that determine how splits are assigned to readers.")
                   .linebreak()
                   .list(
                       TextElement.text(
                           SplitAssignerType.SIMPLE
-                              + ": simple assigner that doesn't provide any guarantee on order or locality."))
+                              + ": simple assigner that doesn't provide any guarantee on order or locality."),
+                      TextElement.text(
+                          SplitAssignerType.LOCALITY
+                              + ": locality assigner that provide guarantee on locality."))

Review Comment:
   we shouldn't use `guarantee`, which is not very accurate. my suggestion would be `that assign splits with locality affinity preference`.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java:
##########
@@ -68,6 +67,24 @@ static boolean isLocalityEnabled(
     return false;
   }
 
+  static SplitAssignerFactory assignerFactory(

Review Comment:
   this is not a getter method. probably `createAssignerFactory` is more appropriate?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -236,8 +236,6 @@ public FlinkInputFormat buildFormat() {
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
       }
 
-      contextBuilder.exposeLocality(

Review Comment:
   why removing this part?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {

Review Comment:
   please check SimpleSplitAssigner. some/most methods may require `synchronized` to be safe. 
   
   also maybe add a short description on what this assigner does. similar to the config doc.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));

Review Comment:
   collect copies the entire collection to a list. the state can be big. why don't we just do a `forEach` on the input collection to avoid the copy.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssignerType.java:
##########
@@ -27,6 +27,12 @@ public enum SplitAssignerType {
     public SplitAssignerFactory factory() {
       return new SimpleSplitAssignerFactory();
     }
+  },
+  LOCALITY {

Review Comment:
   nit: should have an empty line btw



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java:
##########
@@ -69,6 +74,13 @@ protected IcebergSourceSplit toSplitType(String splitId, IcebergSourceSplit spli
   }
 
   private void requestSplit(Collection<String> finishedSplitIds) {
-    context.sendSourceEventToCoordinator(new SplitRequestEvent(finishedSplitIds));
+    String hostname = null;
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();

Review Comment:
   this shouldn't need to be done in per requestSplit step. it can be done once during init, right?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {

Review Comment:
   I am not sure this is the right approach. it can make get split very slow if there are a lot of splits with different hosts. Seems to me that we shouldn't use `Set<String>` as the Map key. 
   
   I think in the split planning phase maybe we shouldn't use CombinedScanTask directly. We can create one task per file so that we can ensure a single hostname for the task. Then this can be a single string map key.
   
   if we want to further optimize, we can group the files for the same host back to CombinedScanTask in the planning phase.
   
   cc @pvary 



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSplitAssignerBase.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.junit.Assert;
+import org.mockito.internal.util.collections.Sets;
+
+public abstract class TestSplitAssignerBase {
+
+  protected void assertAvailableFuture(
+      SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) {
+    assertAvailableFuture(assigner, splitCount, addSplitsRunnable, null);
+  }
+
+  protected void assertAvailableFuture(
+      SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable, String hostname) {
+    // register callback
+    AtomicBoolean futureCompleted = new AtomicBoolean();
+    CompletableFuture<Void> future = assigner.isAvailable();
+    future.thenAccept(ignored -> futureCompleted.set(true));
+    // calling isAvailable again should return the same object reference
+    // note that thenAccept will return a new future.
+    // we want to assert the same instance on the assigner returned future
+    Assert.assertSame(future, assigner.isAvailable());
+
+    // now add some splits
+    addSplitsRunnable.run();
+    Assert.assertEquals(true, futureCompleted.get());
+
+    for (int i = 0; i < splitCount; ++i) {
+      assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, hostname);
+    }
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);

Review Comment:
   nit: use the hostname version here too



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSplitAssignerBase.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.junit.Assert;
+import org.mockito.internal.util.collections.Sets;
+
+public abstract class TestSplitAssignerBase {
+
+  protected void assertAvailableFuture(
+      SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) {
+    assertAvailableFuture(assigner, splitCount, addSplitsRunnable, null);
+  }
+
+  protected void assertAvailableFuture(
+      SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable, String hostname) {
+    // register callback
+    AtomicBoolean futureCompleted = new AtomicBoolean();
+    CompletableFuture<Void> future = assigner.isAvailable();
+    future.thenAccept(ignored -> futureCompleted.set(true));
+    // calling isAvailable again should return the same object reference
+    // note that thenAccept will return a new future.
+    // we want to assert the same instance on the assigner returned future
+    Assert.assertSame(future, assigner.isAvailable());
+
+    // now add some splits
+    addSplitsRunnable.run();
+    Assert.assertEquals(true, futureCompleted.get());
+
+    for (int i = 0; i < splitCount; ++i) {
+      assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, hostname);

Review Comment:
   can we also assert on the pendingSplitsCount too?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java:
##########
@@ -39,24 +39,37 @@ public class IcebergSourceSplit implements SourceSplit, Serializable {
 
   private int fileOffset;
   private long recordOffset;
+  private final String[] hostname;

Review Comment:
   the variable name should be plural since it is an array. applied to everywhere when this is used.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -434,6 +436,9 @@ public IcebergSource<T> build() {
         }
       }
 
+      splitAssignerFactory =
+          SourceUtil.assignerFactory(flinkConfig, splitAssignerFactory, context.exposeLocality());

Review Comment:
   this function behavior is a little weird. we pass in `splitAssignerFactory` and return another `splitAssignerFactory`. it is more clear to do the null check outside the method.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();
+  }
+
+  @Override
+  public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  @Override
+  public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  private void addSplits(Collection<IcebergSourceSplit> splits) {
+    if (splits.isEmpty()) {
+      return;
+    }
+
+    for (IcebergSourceSplit split : splits) {
+      String[] hostname = split.hostname();
+      if (hostname == null) {
+        hostname = new String[] {DEFAULT_HOSTNAME};
+      }
+
+      Set<String> hosts = Sets.newHashSet(hostname);
+      Deque<IcebergSourceSplit> icebergSourceSplits =
+          pendingSplits.computeIfAbsent(hosts, key -> new ArrayDeque<>());
+      icebergSourceSplits.add(split);
+    }
+
+    // only complete pending future if new splits are discovered
+    completeAvailableFuturesIfNeeded();
+  }
+
+  /** Simple assigner only tracks unassigned splits */
+  @Override
+  public Collection<IcebergSourceSplitState> state() {
+    return pendingSplits.values().stream()
+        .flatMap(Collection::stream)
+        .map(split -> new IcebergSourceSplitState(split, IcebergSourceSplitStatus.UNASSIGNED))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized CompletableFuture<Void> isAvailable() {
+    if (availableFuture == null) {
+      availableFuture = new CompletableFuture<>();
+    }
+    return availableFuture;
+  }
+
+  @Override
+  public int pendingSplitCount() {
+    return (int) pendingSplits.values().stream().mapToInt(Deque::size).count();

Review Comment:
   last step should be `sum` right? maybe we should add some assertions in unit test



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java:
##########
@@ -32,7 +32,7 @@ public class SplitRequestEvent implements SourceEvent {
   private final String requesterHostname;
 
   public SplitRequestEvent() {
-    this(Collections.emptyList());
+    this(Collections.emptyList(), null);

Review Comment:
   this change seems not necessary?



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();
+  }
+
+  @Override
+  public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  @Override
+  public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  private void addSplits(Collection<IcebergSourceSplit> splits) {
+    if (splits.isEmpty()) {
+      return;
+    }
+
+    for (IcebergSourceSplit split : splits) {
+      String[] hostname = split.hostname();
+      if (hostname == null) {
+        hostname = new String[] {DEFAULT_HOSTNAME};
+      }
+
+      Set<String> hosts = Sets.newHashSet(hostname);
+      Deque<IcebergSourceSplit> icebergSourceSplits =
+          pendingSplits.computeIfAbsent(hosts, key -> new ArrayDeque<>());
+      icebergSourceSplits.add(split);
+    }
+
+    // only complete pending future if new splits are discovered
+    completeAvailableFuturesIfNeeded();
+  }
+
+  /** Simple assigner only tracks unassigned splits */

Review Comment:
   paste error



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();

Review Comment:
   I also think this shouldn't guarantee locality. Instead, it can be the preference behavior. if there are splits matching the request host, prefer those splits. Otherwise, it can be free to get splits from any other hosts.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();
+  }
+
+  @Override
+  public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  @Override
+  public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  private void addSplits(Collection<IcebergSourceSplit> splits) {
+    if (splits.isEmpty()) {
+      return;
+    }
+
+    for (IcebergSourceSplit split : splits) {
+      String[] hostname = split.hostname();

Review Comment:
   should be plural



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111404402


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));

Review Comment:
   I have a little bit of confusion here. It should just make a new list, but the object is still a reference to the original one, kind of a shallow copy, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#issuecomment-1364514213

   @stevenzwu @openinx @rdblue @Fokko @pvary  could you please take a look at it when you get a chance? thx.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111756854


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {

Review Comment:
   I understand what you mean. Perhaps we could consider using another approach:
   
   1. Construct a `Map<Hostname, Set<IcebergSourceSplit>>` named "`splits`".
   2. Add an `IcebergSourceSplit` to "splits". Since an `IcebergSourceSplit` may have multiple hostnames, there will be multiple keys in "`splits`" that point to a Set containing the same `IcebergSourceSplit`.
   3. To retrieve a split for a specific hostname in "`splits`", retrieve all `Sets` pointed to by the split's hostnames and remove the split from each of them.
   4. Return the split.
   
   Here's some pseudocode:
   ```
   Map<Hostname, Set<IcebergSourceSplit>> splits = new ...
   
   // add IcebergSourceSplit splitA:
   splitA.hostnames.forEach{
       splits.put(hostname, new Set(splitA))
      or 
       splits.get(hostname).add(splitA)
   }
   
   // get IcebergSourceSplit splitB:
   IcebergSourceSplit splitB = splits.get("host1").poll()
   
   splitB.hostnames.forEach{
       splits.get(hostname).remove(splitB)
   }
   
   return splitB;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1139922919


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {

Review Comment:
   In my opinion, it seems that there is no need to do so, even if there is only one file, multiple blocks may be stored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1153978889


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null

Review Comment:
   modified it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#issuecomment-1330494784

   @stevenzwu @openinx @rdblue @Fokko could you please take a look at it when you get a chance? thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#issuecomment-1513970763

   cc @stevenzwu could you please take a look when you have a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1111418712


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {
+      Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next();
+      Deque<IcebergSourceSplit> splits = splitsEntry.getValue();
+      if (splits.isEmpty()) {
+        splitsIterator.remove();
+        continue;
+      }
+
+      if (splitsEntry.getKey().contains(hostname)) {
+        return splits;
+      }
+    }
+
+    if (!splitsDeque.isEmpty()) {
+      return splitsDeque.values().stream().findAny().get();
+    }
+
+    return new ArrayDeque<>();

Review Comment:
   The code above it, if there are splits that haven't been allocated yet, it returns any of them. An empty list is returned, indicating that there are no more splits available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1128200547


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    Deque<IcebergSourceSplit> icebergSourceSplits =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits)
+            : getIcebergSourceSplits(hostname, pendingSplits);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    if (!icebergSourceSplits.isEmpty()) {
+      IcebergSourceSplit split = icebergSourceSplits.poll();
+      return GetSplitResult.forSplit(split);
+    }
+
+    return GetSplitResult.unavailable();
+  }
+
+  private Deque<IcebergSourceSplit> getIcebergSourceSplits(
+      String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) {
+    if (splitsDeque.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator =
+        splitsDeque.entrySet().iterator();
+    while (splitsIterator.hasNext()) {

Review Comment:
   the pseudo algo looks ok to me. BTW, for locality aware assigner, does it mean that `CombinedScanTask` is better to just contain one file? do we need to decompose it to individual files? E.g., a combined task contains 3 files: f1(h1, h2, h3), f2(h4, h5, h6), f3(h7, h8, h9). But the downside would be more smaller splits. so there is a tradeoff here. not sure what is the typical preference in HDFS scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1152885573


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME)
+            : getIcebergSourceSplits(hostname);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    return split != null ? GetSplitResult.forSplit(split) : GetSplitResult.unavailable();
+  }
+
+  private IcebergSourceSplit getIcebergSourceSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = getSplits(hostname);
+
+    if (icebergSourceSplits != null) {
+      Optional<IcebergSourceSplit> first = icebergSourceSplits.stream().findFirst();
+      if (first.isPresent()) {
+        pendingSplits.values().forEach(splitSet -> splitSet.remove(first.get()));
+        return first.get();
+      }
+    }
+
+    return null;
+  }
+
+  private Set<IcebergSourceSplit> getSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = pendingSplits.get(hostname);
+    if (icebergSourceSplits != null) {
+      if (!icebergSourceSplits.isEmpty()) {
+        return icebergSourceSplits;
+      }
+    }
+
+    pendingSplits.remove(hostname);
+
+    return pendingSplits.values().stream()
+        .filter(splitSet -> splitSet != null && splitSet.size() > 0)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @Override
+  public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  @Override
+  public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
+    addSplits(splits);
+  }
+
+  private synchronized void addSplits(Collection<IcebergSourceSplit> splits) {
+    if (splits.isEmpty()) {
+      return;
+    }
+
+    for (IcebergSourceSplit split : splits) {
+      String[] hostnames = split.hostnames();
+      if (hostnames == null) {
+        hostnames = new String[] {DEFAULT_HOSTNAME};
+      }
+
+      for (String hostname : hostnames) {
+        pendingSplits.compute(
+            hostname,
+            (key, value) -> {
+              if (value == null) {
+                return Sets.newHashSet(split);
+              } else {
+                value.add(split);
+                return value;
+              }
+            });
+      }
+    }
+
+    // only complete pending future if new splits are discovered
+    completeAvailableFuturesIfNeeded();
+  }
+
+  @Override
+  public synchronized Collection<IcebergSourceSplitState> state() {

Review Comment:
   Yes, there might be duplicate Splits here. But because I use a Set to store them, I didn’t deduplicate them. We can try to remove the duplicate Splits here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6160: Flink: Support locality with LocalitySplitAssigner

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6160:
URL: https://github.com/apache/iceberg/pull/6160#discussion_r1152870720


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A split assigner that assigns splits to subtasks based on the locality of the splits. */
+@Internal
+public class LocalitySplitAssigner implements SplitAssigner {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class);
+
+  private static final String DEFAULT_HOSTNAME = "hostname";
+  private final Map<String, Set<IcebergSourceSplit>> pendingSplits;
+  private CompletableFuture<Void> availableFuture;
+
+  public LocalitySplitAssigner() {
+    this.pendingSplits = Maps.newHashMap();
+  }
+
+  public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
+    this.pendingSplits = Maps.newHashMap();
+    Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split);
+    addSplits(splits.collect(Collectors.toList()));
+  }
+
+  @Override
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {
+    if (pendingSplits.isEmpty()) {
+      return GetSplitResult.unavailable();
+    }
+
+    IcebergSourceSplit split =
+        hostname == null
+            ? getIcebergSourceSplits(DEFAULT_HOSTNAME)
+            : getIcebergSourceSplits(hostname);
+    LOG.info("Get Iceberg source splits for: {}", hostname);
+
+    return split != null ? GetSplitResult.forSplit(split) : GetSplitResult.unavailable();
+  }
+
+  private IcebergSourceSplit getIcebergSourceSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = getSplits(hostname);
+
+    if (icebergSourceSplits != null) {
+      Optional<IcebergSourceSplit> first = icebergSourceSplits.stream().findFirst();
+      if (first.isPresent()) {
+        pendingSplits.values().forEach(splitSet -> splitSet.remove(first.get()));
+        return first.get();
+      }
+    }
+
+    return null;
+  }
+
+  private Set<IcebergSourceSplit> getSplits(String hostname) {
+    Set<IcebergSourceSplit> icebergSourceSplits = pendingSplits.get(hostname);
+    if (icebergSourceSplits != null) {
+      if (!icebergSourceSplits.isEmpty()) {
+        return icebergSourceSplits;
+      }
+    }
+
+    pendingSplits.remove(hostname);

Review Comment:
   Try to get the split collection with the key of the hostname from the Map. If it is not empty, return the collection. Otherwise, remove the hostname from the Map(its value may be null or empty), and then find and return a non-empty split collection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org