You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/05/05 19:07:19 UTC

[drill] 05/06: DRILL-6281: Introduce Collectors class for internal iterators

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d0a7545c854e5ef99ed45edd1fe3520aa6dcaa74
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Mon Apr 30 07:11:20 2018 -0700

    DRILL-6281: Introduce Collectors class for internal iterators
    
    closes #1238
---
 .../drill/common/collections/Collectors.java       | 123 +++++++++++++++++++++
 .../org/apache/drill/exec/store/TimedCallable.java |   9 +-
 .../exec/store/parquet/metadata/Metadata.java      |  17 ++-
 3 files changed, 132 insertions(+), 17 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/collections/Collectors.java b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
new file mode 100644
index 0000000..3e80b2f
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.collections;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+public class Collectors {
+  private Collectors() {
+  }
+
+  /**
+   *
+   * @param map {@code Map<K, V>} to collect elements from
+   * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T>
+   * @param <T> elements type in {@code List}
+   * @param <K> key type in {@code Map}
+   * @param <V> value type in {@code Map}
+   * @return new {@code List} that contains elements after applying mapper {@code BiFunction} to the input {@code Map}
+   */
+  public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper) {
+    return collect(new ArrayList<>(map.size()), map, mapper);
+  }
+
+  /**
+   *
+   * @param map {@code Map<K, V>} to collect elements from
+   * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T>
+   * @param predicate {@code Predicate} filter to apply
+   * @param <T> elements type in {@code List}
+   * @param <K> keys type in {@code Map}
+   * @param <V> value type in {@code Map}
+   * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code BiFunction}
+   *   to the input {@code Map}
+   */
+  public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+    return collect(new ArrayList<>(map.size()), map, mapper, predicate);
+  }
+
+  public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper) {
+    Preconditions.checkNotNull(list);
+    Preconditions.checkNotNull(map);
+    Preconditions.checkNotNull(mapper);
+    map.forEach((k, v) -> list.add(mapper.apply(k, v)));
+    return list;
+  }
+
+  public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+    Preconditions.checkNotNull(list);
+    Preconditions.checkNotNull(map);
+    Preconditions.checkNotNull(mapper);
+    Preconditions.checkNotNull(predicate);
+    map.forEach((k, v) -> {
+      T t = mapper.apply(k, v);
+      if (predicate.test(t)) {
+        list.add(t);
+      }
+    });
+    return list;
+  }
+
+  /**
+   *
+   * @param collection {@code Collection<E>} of elements of type <E>
+   * @param mapper {@code Function<E, T>} mapper function to apply
+   * @param <T> elements type in {@code List}
+   * @param <E> elements type in {@code Collection}
+   * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code Function}
+   *   to the input {@code Collection}
+   */
+  public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper) {
+    Preconditions.checkNotNull(collection);
+    Preconditions.checkNotNull(mapper);
+    ArrayList<T> list = new ArrayList<>(collection.size());
+    collection.forEach(e -> list.add(mapper.apply(e)));
+    return list;
+  }
+
+  /**
+   *
+   * @param collection {@code Collection<E>} of elements of type <E>
+   * @param mapper {@code Function<E, T>} mapper function to apply
+   * @param predicate {@code Predicate} filter to apply
+   * @param <T>  elements type in {@code List}
+   * @param <E> elements type in {@code Collection}
+   * @return new {@code List} that contains elements after applying mapper {@code Function} to the input {@code Collection}
+   */
+  public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper, Predicate<T> predicate) {
+    Preconditions.checkNotNull(collection);
+    Preconditions.checkNotNull(mapper);
+    Preconditions.checkNotNull(predicate);
+    ArrayList<T> list = new ArrayList<>(collection.size());
+    collection.forEach(e -> {
+      T t = mapper.apply(e);
+      if (predicate.test(t)) {
+        list.add(t);
+      }
+    });
+    return list;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
index ecc5579..3c2bbfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -30,8 +29,8 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
+import org.apache.drill.common.collections.Collectors;
 import org.apache.drill.common.exceptions.UserException;
 
 import org.slf4j.Logger;
@@ -212,11 +211,7 @@ public abstract class TimedCallable<V> implements Callable<V> {
     final FutureMapper<V> futureMapper = new FutureMapper<>();
     final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null;
     try {
-      return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS)
-          .stream()
-          .map(futureMapper)
-          .filter(Objects::nonNull)
-          .collect(Collectors.toList());
+      return Collectors.toList(threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS), futureMapper);
     } catch (InterruptedException e) {
       final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity);
       logger.error(errMsg, e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 49a6b52..cdf98e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.collections.Collectors;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.store.TimedCallable;
@@ -68,7 +69,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
@@ -294,7 +294,7 @@ public class Metadata {
 
     Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream()
         .collect(
-            Collectors.toMap(
+            java.util.stream.Collectors.toMap(
                 Function.identity(),
                 s -> fs,
                 (oldFs, newFs) -> newFs,
@@ -335,14 +335,11 @@ public class Metadata {
    */
   private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
       ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException {
-
-    List<TimedCallable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream()
-        .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue()))
-        .collect(Collectors.toList());
-
-    List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>();
-    metaDataList.addAll(TimedCallable.run("Fetch parquet metadata", logger, gatherers, 16));
-    return metaDataList;
+    return TimedCallable.run("Fetch parquet metadata", logger,
+        Collectors.toList(fileStatusMap,
+            (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem)),
+        16
+    );
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.