You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/05/05 19:07:19 UTC
[drill] 05/06: DRILL-6281: Introduce Collectors class for internal
iterators
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit d0a7545c854e5ef99ed45edd1fe3520aa6dcaa74
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Mon Apr 30 07:11:20 2018 -0700
DRILL-6281: Introduce Collectors class for internal iterators
closes #1238
---
.../drill/common/collections/Collectors.java | 123 +++++++++++++++++++++
.../org/apache/drill/exec/store/TimedCallable.java | 9 +-
.../exec/store/parquet/metadata/Metadata.java | 17 ++-
3 files changed, 132 insertions(+), 17 deletions(-)
diff --git a/common/src/main/java/org/apache/drill/common/collections/Collectors.java b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
new file mode 100644
index 0000000..3e80b2f
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.collections;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+public class Collectors {
+ private Collectors() {
+ }
+
+ /**
+ *
+ * @param map {@code Map<K, V>} to collect elements from
+ * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T>
+ * @param <T> elements type in {@code List}
+ * @param <K> key type in {@code Map}
+ * @param <V> value type in {@code Map}
+ * @return new {@code List} that contains elements after applying mapper {@code BiFunction} to the input {@code Map}
+ */
+ public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper) {
+ return collect(new ArrayList<>(map.size()), map, mapper);
+ }
+
+ /**
+ *
+ * @param map {@code Map<K, V>} to collect elements from
+ * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T>
+ * @param predicate {@code Predicate} filter to apply
+ * @param <T> elements type in {@code List}
+ * @param <K> keys type in {@code Map}
+ * @param <V> value type in {@code Map}
+ * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code BiFunction}
+ * to the input {@code Map}
+ */
+ public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+ return collect(new ArrayList<>(map.size()), map, mapper, predicate);
+ }
+
+ public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper) {
+ Preconditions.checkNotNull(list);
+ Preconditions.checkNotNull(map);
+ Preconditions.checkNotNull(mapper);
+ map.forEach((k, v) -> list.add(mapper.apply(k, v)));
+ return list;
+ }
+
+ public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+ Preconditions.checkNotNull(list);
+ Preconditions.checkNotNull(map);
+ Preconditions.checkNotNull(mapper);
+ Preconditions.checkNotNull(predicate);
+ map.forEach((k, v) -> {
+ T t = mapper.apply(k, v);
+ if (predicate.test(t)) {
+ list.add(t);
+ }
+ });
+ return list;
+ }
+
+ /**
+ *
+ * @param collection {@code Collection<E>} of elements of type <E>
+ * @param mapper {@code Function<E, T>} mapper function to apply
+ * @param <T> elements type in {@code List}
+ * @param <E> elements type in {@code Collection}
+ * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code Function}
+ * to the input {@code Collection}
+ */
+ public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper) {
+ Preconditions.checkNotNull(collection);
+ Preconditions.checkNotNull(mapper);
+ ArrayList<T> list = new ArrayList<>(collection.size());
+ collection.forEach(e -> list.add(mapper.apply(e)));
+ return list;
+ }
+
+ /**
+ *
+ * @param collection {@code Collection<E>} of elements of type <E>
+ * @param mapper {@code Function<E, T>} mapper function to apply
+ * @param predicate {@code Predicate} filter to apply
+ * @param <T> elements type in {@code List}
+ * @param <E> elements type in {@code Collection}
+ * @return new {@code List} that contains elements after applying mapper {@code Function} to the input {@code Collection}
+ */
+ public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper, Predicate<T> predicate) {
+ Preconditions.checkNotNull(collection);
+ Preconditions.checkNotNull(mapper);
+ Preconditions.checkNotNull(predicate);
+ ArrayList<T> list = new ArrayList<>(collection.size());
+ collection.forEach(e -> {
+ T t = mapper.apply(e);
+ if (predicate.test(t)) {
+ list.add(t);
+ }
+ });
+ return list;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
index ecc5579..3c2bbfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store;
import java.io.IOException;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@@ -30,8 +29,8 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.stream.Collectors;
+import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.exceptions.UserException;
import org.slf4j.Logger;
@@ -212,11 +211,7 @@ public abstract class TimedCallable<V> implements Callable<V> {
final FutureMapper<V> futureMapper = new FutureMapper<>();
final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null;
try {
- return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS)
- .stream()
- .map(futureMapper)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ return Collectors.toList(threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS), futureMapper);
} catch (InterruptedException e) {
final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity);
logger.error(errMsg, e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 49a6b52..cdf98e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.store.TimedCallable;
@@ -68,7 +69,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.stream.Collectors;
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
@@ -294,7 +294,7 @@ public class Metadata {
Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream()
.collect(
- Collectors.toMap(
+ java.util.stream.Collectors.toMap(
Function.identity(),
s -> fs,
(oldFs, newFs) -> newFs,
@@ -335,14 +335,11 @@ public class Metadata {
*/
private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException {
-
- List<TimedCallable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream()
- .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue()))
- .collect(Collectors.toList());
-
- List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>();
- metaDataList.addAll(TimedCallable.run("Fetch parquet metadata", logger, gatherers, 16));
- return metaDataList;
+ return TimedCallable.run("Fetch parquet metadata", logger,
+ Collectors.toList(fileStatusMap,
+ (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem)),
+ 16
+ );
}
/**
--
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.