You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/22 14:31:22 UTC
[3/4] incubator-beam git commit: [BEAM-59] Use ServiceLoader to
register IOChannelFactories in IOChannelUtils.
[BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa417f9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa417f9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa417f9c
Branch: refs/heads/master
Commit: fa417f9c2c671626eba3326e82d47741000ec64d
Parents: cd1a5e7
Author: Pei He <pe...@google.com>
Authored: Mon Oct 31 18:02:49 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 22 06:18:55 2016 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../options/DataflowPipelineOptionsTest.java | 6 +-
.../runners/dataflow/util/PackageUtilTest.java | 2 +-
.../sdk/options/PipelineOptionsFactory.java | 32 +----
.../apache/beam/sdk/runners/PipelineRunner.java | 2 +-
.../apache/beam/sdk/testing/TestPipeline.java | 2 +-
.../beam/sdk/util/FileIOChannelFactory.java | 10 +-
.../sdk/util/IOChannelFactoryRegistrar.java | 11 +-
.../apache/beam/sdk/util/IOChannelUtils.java | 133 ++++++++++++++++++-
.../beam/sdk/util/common/ReflectHelpers.java | 29 ++++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +-
.../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +-
.../sdk/options/PipelineOptionsFactoryTest.java | 34 -----
.../util/FileIOChannelFactoryRegistrarTest.java | 4 +-
.../beam/sdk/util/FileIOChannelFactoryTest.java | 2 +-
.../util/GcsIOChannelFactoryRegistrarTest.java | 4 +-
.../beam/sdk/util/IOChannelUtilsTest.java | 39 ++++++
.../sdk/util/common/ReflectHelpersTest.java | 33 +++++
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +-
20 files changed, 259 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 841b13f..36328e9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -240,7 +240,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
public static DataflowRunner fromOptions(PipelineOptions options) {
// (Re-)register standard IO factories. Clobbers any prior credentials.
- IOChannelUtils.registerStandardIOFactories(options);
+ IOChannelUtils.registerIOFactoriesAllowOverride(options);
DataflowPipelineOptions dataflowOptions =
PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 202d04b..52082e0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -126,7 +126,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testStagingLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- IOChannelUtils.registerStandardIOFactories(options);
+ IOChannelUtils.registerIOFactoriesAllowOverride(options);
options.setTempLocation("file://temp_location");
options.setStagingLocation("gs://staging_location");
assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
@@ -136,7 +136,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- IOChannelUtils.registerStandardIOFactories(options);
+ IOChannelUtils.registerIOFactoriesAllowOverride(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location");
assertEquals("gs://temp_location", options.getGcpTempLocation());
@@ -146,7 +146,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToGcpTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- IOChannelUtils.registerStandardIOFactories(options);
+ IOChannelUtils.registerIOFactoriesAllowOverride(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location");
options.setGcpTempLocation("gs://gcp_temp_location");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 02aceef..05a87dd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -135,7 +135,7 @@ public class PackageUtilTest {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGcsUtil(mockGcsUtil);
- IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+ IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
}
private File makeFileWithContents(String name, String contents) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 304e166..6009867 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -481,23 +481,6 @@ public class PipelineOptionsFactory {
/** The width at which options should be output. */
private static final int TERMINAL_WIDTH = 80;
- /**
- * Finds the appropriate {@code ClassLoader} to be used by the
- * {@link ServiceLoader#load} call, which by default would use the context
- * {@code ClassLoader}, which can be null. The fallback is as follows: context
- * ClassLoader, class ClassLoader and finaly the system ClassLoader.
- */
- static ClassLoader findClassLoader() {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- if (classLoader == null) {
- classLoader = PipelineOptionsFactory.class.getClassLoader();
- }
- if (classLoader == null) {
- classLoader = ClassLoader.getSystemClassLoader();
- }
- return classLoader;
- }
-
static {
try {
IGNORED_METHODS = ImmutableSet.<Method>builder()
@@ -514,10 +497,10 @@ public class PipelineOptionsFactory {
throw new ExceptionInInitializerError(e);
}
- CLASS_LOADER = findClassLoader();
+ CLASS_LOADER = ReflectHelpers.findClassLoader();
Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
- Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+ Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
pipelineRunnerRegistrars.addAll(
Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
// Store the list of all available pipeline runners.
@@ -579,7 +562,7 @@ public class PipelineOptionsFactory {
private static void initializeRegistry() {
register(PipelineOptions.class);
Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
- Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+ Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
pipelineOptionsRegistrars.addAll(
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
@@ -1390,15 +1373,6 @@ public class PipelineOptionsFactory {
}
}
- /** A {@link Comparator} that uses the object's classes canonical name to compare them. */
- private static class ObjectsClassComparator implements Comparator<Object> {
- static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator();
- @Override
- public int compare(Object o1, Object o2) {
- return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName());
- }
- }
-
/** A {@link Comparator} that uses the generic method signature to sort them. */
private static class MethodComparator implements Comparator<Method> {
static final MethodComparator INSTANCE = new MethodComparator();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index ede1507..77f5128 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -48,7 +48,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
checkNotNull(options);
// (Re-)register standard IO factories. Clobbers any prior credentials.
- IOChannelUtils.registerStandardIOFactories(gcsOptions);
+ IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
@SuppressWarnings("unchecked")
PipelineRunner<? extends PipelineResult> result =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index f1bf09d..493d4cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -152,7 +152,7 @@ public class TestPipeline extends Pipeline {
}
options.setStableUniqueNames(CheckEnabled.ERROR);
- IOChannelUtils.registerStandardIOFactories(options);
+ IOChannelUtils.registerIOFactoriesAllowOverride(options);
return options;
} catch (IOException e) {
throw new RuntimeException("Unable to instantiate test options from system property "
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index 13591a3..dd81a34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -44,6 +44,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,14 +58,7 @@ public class FileIOChannelFactory implements IOChannelFactory {
/**
* Create a {@link FileIOChannelFactory} with the given {@link PipelineOptions}.
*/
- public static FileIOChannelFactory fromOptions(PipelineOptions options) {
- return create();
- }
-
- /**
- * Create a {@link FileIOChannelFactory}.
- */
- public static FileIOChannelFactory create() {
+ public static FileIOChannelFactory fromOptions(@Nullable PipelineOptions options) {
return new FileIOChannelFactory();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
index 93752a4..7776b13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
@@ -22,7 +22,7 @@ import java.util.ServiceLoader;
import org.apache.beam.sdk.options.PipelineOptions;
/**
- * A registrar that creates {@link IOChannelFactory} from {@link PipelineOptions}.
+ * A registrar that creates {@link IOChannelFactory} instances from {@link PipelineOptions}.
*
* <p>{@link IOChannelFactory} creators have the ability to provide a registrar by creating
* a {@link ServiceLoader} entry and a concrete implementation of this interface.
@@ -32,12 +32,17 @@ import org.apache.beam.sdk.options.PipelineOptions;
*/
public interface IOChannelFactoryRegistrar {
/**
- * Create a {@link IOChannelFactory} with the given {@link PipelineOptions}.
+ * Create a {@link IOChannelFactory} from the given {@link PipelineOptions}.
*/
IOChannelFactory fromOptions(PipelineOptions options);
/**
- * Get the scheme.
+ * Get the URI scheme which defines the namespace of the IOChannelFactoryRegistrar.
+ *
+ * <p>The scheme is required to be unique among all
+ * {@link IOChannelFactoryRegistrar IOChannelFactoryRegistrars}.
+ *
+ * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
String getScheme();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index d221fa9..d60ee97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -17,19 +17,33 @@
*/
package org.apache.beam.sdk.util;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.text.DecimalFormat;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
* Provides utilities for creating read and write channels.
@@ -42,6 +56,8 @@ public class IOChannelUtils {
// Pattern that matches shard placeholders within a shard template.
private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
+ private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader();
+
/**
* Associates a scheme with an {@link IOChannelFactory}.
*
@@ -50,18 +66,123 @@ public class IOChannelUtils {
*
* <p>For example, when reading from "gs://bucket/path", the scheme "gs" is
* used to lookup the appropriate factory.
+ *
+ * <p>{@link PipelineOptions} are required to provide dependencies and
+ * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+ *
+ * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories}
+ * for the same scheme are detected.
*/
- public static void setIOFactory(String scheme, IOChannelFactory factory) {
+ @VisibleForTesting
+ public static void setIOFactoryInternal(
+ String scheme,
+ IOChannelFactory factory,
+ boolean override) {
+ if (!override && FACTORY_MAP.containsKey(scheme)) {
+ throw new IllegalStateException(String.format(
+ "Failed to register IOChannelFactory: %s. "
+ + "Scheme: [%s] is already registered with %s, and override is not allowed.",
+ FACTORY_MAP.get(scheme).getClass(),
+ scheme,
+ factory.getClass()));
+ }
FACTORY_MAP.put(scheme, factory);
}
/**
- * Registers standard factories globally. This requires {@link PipelineOptions}
- * to provide, e.g., credentials for GCS.
+ * Deregisters the scheme and the associated {@link IOChannelFactory}.
+ */
+ @VisibleForTesting
+ static void deregisterScheme(String scheme) {
+ FACTORY_MAP.remove(scheme);
+ }
+
+ /**
+ * Registers standard factories globally.
+ *
+ * <p>{@link PipelineOptions} are required to provide dependencies and
+ * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+ *
+ * @deprecated use {@link #registerIOFactories}.
*/
+ @Deprecated
public static void registerStandardIOFactories(PipelineOptions options) {
- setIOFactory("gs", GcsIOChannelFactory.fromOptions(options));
- setIOFactory("file", FileIOChannelFactory.fromOptions(options));
+ registerIOFactoriesAllowOverride(options);
+ }
+
+ /**
+ * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}.
+ *
+ * <p>{@link PipelineOptions} are required to provide dependencies and
+ * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+ *
+ * <p>Multiple {@link IOChannelFactory IOChannelFactories} for the same scheme are not allowed.
+ *
+ * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories}
+ * for the same scheme are detected.
+ */
+ public static void registerIOFactories(PipelineOptions options) {
+ registerIOFactoriesInternal(options, false /* override */);
+ }
+
+ /**
+ * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}.
+ *
+ * <p>This requires {@link PipelineOptions} to provide, e.g., credentials for GCS.
+ *
+ * <p>Override existing schemes is allowed.
+ *
+ * @deprecated This is currently to provide different configurations for tests and
+ * is still public for IOChannelFactory redesign purposes.
+ */
+ @Deprecated
+ @VisibleForTesting
+ public static void registerIOFactoriesAllowOverride(PipelineOptions options) {
+ registerIOFactoriesInternal(options, true /* override */);
+ }
+
+ private static void registerIOFactoriesInternal(
+ PipelineOptions options, boolean override) {
+ Set<IOChannelFactoryRegistrar> registrars =
+ Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
+ registrars.addAll(Lists.newArrayList(
+ ServiceLoader.load(IOChannelFactoryRegistrar.class, CLASS_LOADER)));
+
+ checkDuplicateScheme(registrars);
+
+ for (IOChannelFactoryRegistrar registrar : registrars) {
+ setIOFactoryInternal(
+ registrar.getScheme(),
+ registrar.fromOptions(options),
+ override);
+ }
+ }
+
+ @VisibleForTesting
+ static void checkDuplicateScheme(Set<IOChannelFactoryRegistrar> registrars) {
+ Multimap<String, IOChannelFactoryRegistrar> registrarsBySchemes =
+ TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());
+
+ for (IOChannelFactoryRegistrar registrar : registrars) {
+ registrarsBySchemes.put(registrar.getScheme(), registrar);
+ }
+ for (Entry<String, Collection<IOChannelFactoryRegistrar>> entry
+ : registrarsBySchemes.asMap().entrySet()) {
+ if (entry.getValue().size() > 1) {
+ String conflictingRegistrars = Joiner.on(", ").join(
+ FluentIterable.from(entry.getValue())
+ .transform(new Function<IOChannelFactoryRegistrar, String>() {
+ @Override
+ public String apply(@Nonnull IOChannelFactoryRegistrar input) {
+ return input.getClass().getName();
+ }})
+ .toSortedList(Ordering.<String>natural()));
+ throw new IllegalStateException(String.format(
+ "Scheme: [%s] has conflicting registrars: [%s]",
+ entry.getKey(),
+ conflictingRegistrars));
+ }
+ }
}
/**
@@ -174,7 +295,7 @@ public class IOChannelUtils {
Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
if (!matcher.matches()) {
- return FileIOChannelFactory.create();
+ return FileIOChannelFactory.fromOptions(null);
}
String scheme = matcher.group("scheme");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 2b08fee..637e8e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -34,9 +34,12 @@ import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Queue;
+import java.util.ServiceLoader;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.IOChannelUtils;
/**
* Utilities for working with with {@link Class Classes} and {@link Method Methods}.
@@ -167,6 +170,15 @@ public class ReflectHelpers {
}
};
+ /** A {@link Comparator} that uses the object's classes canonical name to compare them. */
+ public static class ObjectsClassComparator implements Comparator<Object> {
+ public static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator();
+ @Override
+ public int compare(Object o1, Object o2) {
+ return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName());
+ }
+ }
+
/**
* Returns all the methods visible from the provided interfaces.
*
@@ -203,4 +215,21 @@ public class ReflectHelpers {
}
return builder.build();
}
+
+ /**
+ * Finds the appropriate {@code ClassLoader} to be used by the
+ * {@link ServiceLoader#load} call, which by default would use the context
+ * {@code ClassLoader}, which can be null. The fallback is as follows: context
+ * ClassLoader, class ClassLoader and finaly the system ClassLoader.
+ */
+ public static ClassLoader findClassLoader() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = IOChannelUtils.class.getClassLoader();
+ }
+ if (classLoader == null) {
+ classLoader = ClassLoader.getSystemClassLoader();
+ }
+ return classLoader;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 1a07177..41a630f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -79,7 +79,7 @@ public class AvroIOTest {
@BeforeClass
public static void setupClass() {
- IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+ IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 5208910..dde5d02 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -462,7 +462,7 @@ public class FileBasedSourceTest {
new File(parent, "file1").getPath(),
new File(parent, "file2").getPath(),
new File(parent, "file3").getPath()));
- IOChannelUtils.setIOFactory("mocked", mockIOFactory);
+ IOChannelUtils.setIOFactoryInternal("mocked", mockIOFactory, true /* override */);
List<String> data2 = createStringDataset(3, 50);
createFileWithData("file2", data2);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index dc71693..d3a5d5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -175,7 +175,7 @@ public class TextIOTest {
@BeforeClass
public static void setupClass() throws IOException {
- IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+ IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
tempFolder = Files.createTempDirectory("TextIOTest");
// empty files
emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 0a2324f..7ff4a92 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -1461,40 +1461,6 @@ public class PipelineOptionsFactoryTest {
containsString("The pipeline runner that will be used to execute the pipeline."));
}
- @Test
- public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException {
- final ClassLoader[] classLoader = new ClassLoader[1];
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- classLoader[0] = PipelineOptionsFactory.findClassLoader();
- }
- });
- thread.setContextClassLoader(null);
- thread.start();
- thread.join();
- assertEquals(PipelineOptionsFactory.class.getClassLoader(), classLoader[0]);
- }
-
- @Test
- public void testFindProperClassLoaderIfContextClassLoaderIsAvailable()
- throws InterruptedException {
- final ClassLoader[] classLoader = new ClassLoader[1];
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- classLoader[0] = PipelineOptionsFactory.findClassLoader();
- }
- });
- ClassLoader cl = new ClassLoader() {};
- thread.setContextClassLoader(cl);
- thread.start();
- thread.join();
- assertEquals(cl, classLoader[0]);
- }
-
private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> {
public static PipelineRunner<PipelineResult> fromOptions(PipelineOptions options) {
return new RegisteredTestRunner();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
index 4600d81..f8f53e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
@@ -33,8 +33,8 @@ public class FileIOChannelFactoryRegistrarTest {
@Test
public void testServiceLoader() {
- for (IOChannelFactoryRegistrar registrar :
- Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+ for (IOChannelFactoryRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
if (registrar instanceof FileIOChannelFactoryRegistrar) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
index e27a043..38be65a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
@@ -46,7 +46,7 @@ import org.junit.runners.JUnit4;
public class FileIOChannelFactoryTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
- private FileIOChannelFactory factory = FileIOChannelFactory.create();
+ private FileIOChannelFactory factory = FileIOChannelFactory.fromOptions(null);
private void testCreate(Path path) throws Exception {
String expected = "my test string";
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
index 32bd4fc..a29dd45 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -33,8 +33,8 @@ public class GcsIOChannelFactoryRegistrarTest {
@Test
public void testServiceLoader() {
- for (IOChannelFactoryRegistrar registrar :
- Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+ for (IOChannelFactoryRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
if (registrar instanceof GcsIOChannelFactoryRegistrar) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
index d92d3cd..6dfa4c7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
@@ -19,15 +19,19 @@ package org.apache.beam.sdk.util;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -40,6 +44,9 @@ public class IOChannelUtilsTest {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@Test
public void testShardFormatExpansion() {
assertEquals("output-001-of-123.txt",
@@ -106,4 +113,36 @@ public class IOChannelUtilsTest {
assertEquals(expected,
IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "aa", "bb", "cc"));
}
+
+ @Test
+ public void testRegisterIOFactoriesAllowOverride() throws Exception {
+ IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create());
+ IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create());
+ assertNotNull(IOChannelUtils.getFactory("gs"));
+ assertNotNull(IOChannelUtils.getFactory("file"));
+ }
+
+ @Test
+ public void testRegisterIOFactories() throws Exception {
+ IOChannelUtils.deregisterScheme("gs");
+ IOChannelUtils.deregisterScheme("file");
+
+ IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create());
+ assertNotNull(IOChannelUtils.getFactory("gs"));
+ assertNotNull(IOChannelUtils.getFactory("file"));
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Failed to register IOChannelFactory");
+ thrown.expectMessage("override is not allowed");
+ IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create());
+ }
+
+ @Test
+ public void testCheckDuplicateScheme() throws Exception {
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Scheme: [file] has conflicting registrars");
+ IOChannelUtils.checkDuplicateScheme(
+ Sets.<IOChannelFactoryRegistrar>newHashSet(
+ new FileIOChannelFactoryRegistrar(),
+ new FileIOChannelFactoryRegistrar()));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
index 8a1708c..5fae25f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
@@ -148,4 +148,37 @@ public class ReflectHelpersTest {
Options.class.getMethod("getObject").getAnnotations()[0]));
}
+ @Test
+ public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException {
+ final ClassLoader[] classLoader = new ClassLoader[1];
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ classLoader[0] = ReflectHelpers.findClassLoader();
+ }
+ });
+ thread.setContextClassLoader(null);
+ thread.start();
+ thread.join();
+ assertEquals(ReflectHelpers.class.getClassLoader(), classLoader[0]);
+ }
+
+ @Test
+ public void testFindProperClassLoaderIfContextClassLoaderIsAvailable()
+ throws InterruptedException {
+ final ClassLoader[] classLoader = new ClassLoader[1];
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ classLoader[0] = ReflectHelpers.findClassLoader();
+ }
+ });
+ ClassLoader cl = new ClassLoader() {};
+ thread.setContextClassLoader(cl);
+ thread.start();
+ thread.join();
+ assertEquals(cl, classLoader[0]);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 51a69a2..40965e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable {
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation("mock://tempLocation");
- IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+ IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
when(mockIOChannelFactory.resolve(anyString(), anyString()))
.thenReturn("mock://tempLocation/output");
when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
@@ -1501,7 +1501,7 @@ public class BigQueryIOTest implements Serializable {
eq(destinationTable.getDatasetId()),
eq(destinationTable.getTableId())))
.thenReturn(new Table().setSchema(new TableSchema()));
- IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+ IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
when(mockIOChannelFactory.resolve(anyString(), anyString()))
.thenReturn("mock://tempLocation/output");
when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
@@ -1584,7 +1584,7 @@ public class BigQueryIOTest implements Serializable {
eq(destinationTable.getDatasetId()),
eq(destinationTable.getTableId())))
.thenReturn(new Table().setSchema(new TableSchema()));
- IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+ IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
when(mockIOChannelFactory.resolve(anyString(), anyString()))
.thenReturn("mock://tempLocation/output");
when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))