You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/22 14:31:20 UTC

[1/4] incubator-beam git commit: [BEAM-59] Create IOChannelFactoryRegistrar interface and its gcs/file implementations.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 249dbc045 -> e53d6d458


[BEAM-59] Create IOChannelFactoryRegistrar interface and its gcs/file implementations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd1a5e7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd1a5e7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd1a5e7e

Branch: refs/heads/master
Commit: cd1a5e7e30a3bd46f822d371333afa975fc7e4af
Parents: e6fa2ff
Author: Pei He <pe...@google.com>
Authored: Mon Oct 31 18:01:41 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 22 06:18:54 2016 -0800

----------------------------------------------------------------------
 .../sdk/util/FileIOChannelFactoryRegistrar.java | 38 +++++++++++++++++
 .../sdk/util/GcsIOChannelFactoryRegistrar.java  | 38 +++++++++++++++++
 .../sdk/util/IOChannelFactoryRegistrar.java     | 43 +++++++++++++++++++
 .../util/FileIOChannelFactoryRegistrarTest.java | 44 ++++++++++++++++++++
 .../util/GcsIOChannelFactoryRegistrarTest.java  | 44 ++++++++++++++++++++
 5 files changed, 207 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java
new file mode 100644
index 0000000..acc0222
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link FileIOChannelFactory}.
+ */
+@AutoService(IOChannelFactoryRegistrar.class)
+public class FileIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar {
+
+  @Override
+  public IOChannelFactory fromOptions(PipelineOptions options) {
+    return FileIOChannelFactory.fromOptions(options);
+  }
+
+  @Override
+  public String getScheme() {
+    return "file";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
new file mode 100644
index 0000000..b4c457f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link GcsIOChannelFactory}.
+ */
+@AutoService(IOChannelFactoryRegistrar.class)
+public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar {
+
+  @Override
+  public GcsIOChannelFactory fromOptions(PipelineOptions options) {
+    return GcsIOChannelFactory.fromOptions(options);
+  }
+
+  @Override
+  public String getScheme() {
+    return "gs";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
new file mode 100644
index 0000000..93752a4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.service.AutoService;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A registrar that creates {@link IOChannelFactory} from {@link PipelineOptions}.
+ *
+ * <p>{@link IOChannelFactory} creators have the ability to provide a registrar by creating
+ * a {@link ServiceLoader} entry and a concrete implementation of this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+public interface IOChannelFactoryRegistrar {
+  /**
+   * Create a {@link IOChannelFactory} with the given {@link PipelineOptions}.
+   */
+  IOChannelFactory fromOptions(PipelineOptions options);
+
+  /**
+   * Get the scheme.
+   */
+  String getScheme();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
new file mode 100644
index 0000000..4600d81
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FileIOChannelFactoryRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class FileIOChannelFactoryRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (IOChannelFactoryRegistrar registrar :
+        Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+      if (registrar instanceof FileIOChannelFactoryRegistrar) {
+        return;
+      }
+    }
+    fail("Expected to find " + FileIOChannelFactoryRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
new file mode 100644
index 0000000..32bd4fc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsIOChannelFactoryRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (IOChannelFactoryRegistrar registrar :
+        Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+      if (registrar instanceof GcsIOChannelFactoryRegistrar) {
+        return;
+      }
+    }
+    fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
+  }
+}


[4/4] incubator-beam git commit: [BEAM-952] Use ServiceLoader to register IOChannelFactories.

Posted by lc...@apache.org.
[BEAM-952] Use ServiceLoader to register IOChannelFactories.

This closes #1255


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e53d6d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e53d6d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e53d6d45

Branch: refs/heads/master
Commit: e53d6d45895aee6c61fd8060b20751875352a4ee
Parents: 249dbc0 fa417f9
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 22 06:19:30 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 22 06:19:30 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../options/DataflowPipelineOptionsTest.java    |   6 +-
 .../runners/dataflow/util/PackageUtilTest.java  |   2 +-
 .../sdk/options/PipelineOptionsFactory.java     |  32 +----
 .../apache/beam/sdk/runners/PipelineRunner.java |   2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 +-
 .../beam/sdk/util/FileIOChannelFactory.java     |  11 ++
 .../sdk/util/FileIOChannelFactoryRegistrar.java |  38 ++++++
 .../beam/sdk/util/GcsIOChannelFactory.java      |  10 +-
 .../sdk/util/GcsIOChannelFactoryRegistrar.java  |  38 ++++++
 .../sdk/util/IOChannelFactoryRegistrar.java     |  48 +++++++
 .../apache/beam/sdk/util/IOChannelUtils.java    | 134 ++++++++++++++++++-
 .../beam/sdk/util/common/ReflectHelpers.java    |  29 ++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   2 +-
 .../sdk/options/PipelineOptionsFactoryTest.java |  34 -----
 .../util/FileIOChannelFactoryRegistrarTest.java |  44 ++++++
 .../beam/sdk/util/FileIOChannelFactoryTest.java |   2 +-
 .../util/GcsIOChannelFactoryRegistrarTest.java  |  44 ++++++
 .../beam/sdk/util/GcsIOChannelFactoryTest.java  |   2 +-
 .../beam/sdk/util/IOChannelUtilsTest.java       |  39 ++++++
 .../sdk/util/common/ReflectHelpersTest.java     |  33 +++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   6 +-
 24 files changed, 478 insertions(+), 86 deletions(-)
----------------------------------------------------------------------



[2/4] incubator-beam git commit: [BEAM-59] Drops public constructors and uses Factory methods in Gcs/File/IOChannelFactory.

Posted by lc...@apache.org.
[BEAM-59] Drops public constructors and uses Factory methods in Gcs/File/IOChannelFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6fa2ff2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6fa2ff2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6fa2ff2

Branch: refs/heads/master
Commit: e6fa2ff26e836848fa549c290ed098dd019cc4e1
Parents: 249dbc0
Author: Pei He <pe...@google.com>
Authored: Mon Oct 31 17:58:31 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 22 06:18:54 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/util/FileIOChannelFactory.java | 17 +++++++++++++++++
 .../apache/beam/sdk/util/GcsIOChannelFactory.java  | 10 +++++++++-
 .../org/apache/beam/sdk/util/IOChannelUtils.java   |  7 +++----
 .../beam/sdk/util/FileIOChannelFactoryTest.java    |  2 +-
 .../beam/sdk/util/GcsIOChannelFactoryTest.java     |  2 +-
 5 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index 0eefb77..13591a3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,22 @@ import org.slf4j.LoggerFactory;
 public class FileIOChannelFactory implements IOChannelFactory {
   private static final Logger LOG = LoggerFactory.getLogger(FileIOChannelFactory.class);
 
+   /**
+   * Create a {@link FileIOChannelFactory} with the given {@link PipelineOptions}.
+   */
+  public static FileIOChannelFactory fromOptions(PipelineOptions options) {
+    return create();
+  }
+
+  /**
+   * Create a {@link FileIOChannelFactory}.
+   */
+  public static FileIOChannelFactory create() {
+    return new FileIOChannelFactory();
+  }
+
+  private FileIOChannelFactory() {}
+
   /**
    *  Converts the given file spec to a java {@link File}. If {@code spec} is actually a URI with
    *  the {@code file} scheme, then this function will ensure that the returned {@link File}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index bd2ec4e..9f99cd6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
 /**
@@ -32,9 +33,16 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath;
  */
 public class GcsIOChannelFactory implements IOChannelFactory {
 
+  /**
+   * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}.
+   */
+  public static GcsIOChannelFactory fromOptions(PipelineOptions options) {
+    return new GcsIOChannelFactory(options.as(GcsOptions.class));
+  }
+
   private final GcsOptions options;
 
-  public GcsIOChannelFactory(GcsOptions options) {
+  private GcsIOChannelFactory(GcsOptions options) {
     this.options = options;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index 16a6e95..d221fa9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -61,8 +60,8 @@ public class IOChannelUtils {
    * to provide, e.g., credentials for GCS.
    */
   public static void registerStandardIOFactories(PipelineOptions options) {
-    setIOFactory("gs", new GcsIOChannelFactory(options.as(GcsOptions.class)));
-    setIOFactory("file", new FileIOChannelFactory());
+    setIOFactory("gs", GcsIOChannelFactory.fromOptions(options));
+    setIOFactory("file", FileIOChannelFactory.fromOptions(options));
   }
 
   /**
@@ -175,7 +174,7 @@ public class IOChannelUtils {
     Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
 
     if (!matcher.matches()) {
-      return new FileIOChannelFactory();
+      return FileIOChannelFactory.create();
     }
 
     String scheme = matcher.group("scheme");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
index 4d4f93b..e27a043 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
@@ -46,7 +46,7 @@ import org.junit.runners.JUnit4;
 public class FileIOChannelFactoryTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private FileIOChannelFactory factory = new FileIOChannelFactory();
+  private FileIOChannelFactory factory = FileIOChannelFactory.create();
 
   private void testCreate(Path path) throws Exception {
     String expected = "my test string";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
index 6bdb782..7248b38 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
@@ -33,7 +33,7 @@ public class GcsIOChannelFactoryTest {
 
   @Before
   public void setUp() {
-    factory = new GcsIOChannelFactory(PipelineOptionsFactory.as(GcsOptions.class));
+    factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
   }
 
   @Test


[3/4] incubator-beam git commit: [BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils.

Posted by lc...@apache.org.
[BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa417f9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa417f9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa417f9c

Branch: refs/heads/master
Commit: fa417f9c2c671626eba3326e82d47741000ec64d
Parents: cd1a5e7
Author: Pei He <pe...@google.com>
Authored: Mon Oct 31 18:02:49 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Nov 22 06:18:55 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../options/DataflowPipelineOptionsTest.java    |   6 +-
 .../runners/dataflow/util/PackageUtilTest.java  |   2 +-
 .../sdk/options/PipelineOptionsFactory.java     |  32 +----
 .../apache/beam/sdk/runners/PipelineRunner.java |   2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 +-
 .../beam/sdk/util/FileIOChannelFactory.java     |  10 +-
 .../sdk/util/IOChannelFactoryRegistrar.java     |  11 +-
 .../apache/beam/sdk/util/IOChannelUtils.java    | 133 ++++++++++++++++++-
 .../beam/sdk/util/common/ReflectHelpers.java    |  29 ++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   2 +-
 .../sdk/options/PipelineOptionsFactoryTest.java |  34 -----
 .../util/FileIOChannelFactoryRegistrarTest.java |   4 +-
 .../beam/sdk/util/FileIOChannelFactoryTest.java |   2 +-
 .../util/GcsIOChannelFactoryRegistrarTest.java  |   4 +-
 .../beam/sdk/util/IOChannelUtilsTest.java       |  39 ++++++
 .../sdk/util/common/ReflectHelpersTest.java     |  33 +++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   6 +-
 20 files changed, 259 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 841b13f..36328e9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -240,7 +240,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
    */
   public static DataflowRunner fromOptions(PipelineOptions options) {
     // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
 
     DataflowPipelineOptions dataflowOptions =
         PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 202d04b..52082e0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -126,7 +126,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testStagingLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setTempLocation("file://temp_location");
     options.setStagingLocation("gs://staging_location");
     assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
@@ -136,7 +136,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     assertEquals("gs://temp_location", options.getGcpTempLocation());
@@ -146,7 +146,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToGcpTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerStandardIOFactories(options);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     options.setGcpTempLocation("gs://gcp_temp_location");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 02aceef..05a87dd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -135,7 +135,7 @@ public class PackageUtilTest {
     GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
     pipelineOptions.setGcsUtil(mockGcsUtil);
 
-    IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+    IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
   }
 
   private File makeFileWithContents(String name, String contents) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 304e166..6009867 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -481,23 +481,6 @@ public class PipelineOptionsFactory {
   /** The width at which options should be output. */
   private static final int TERMINAL_WIDTH = 80;
 
-  /**
-   * Finds the appropriate {@code ClassLoader} to be used by the
-   * {@link ServiceLoader#load} call, which by default would use the context
-   * {@code ClassLoader}, which can be null. The fallback is as follows: context
-   * ClassLoader, class ClassLoader and finaly the system ClassLoader.
-   */
-  static ClassLoader findClassLoader() {
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = PipelineOptionsFactory.class.getClassLoader();
-    }
-    if (classLoader == null) {
-      classLoader = ClassLoader.getSystemClassLoader();
-    }
-    return classLoader;
-  }
-
   static {
     try {
       IGNORED_METHODS = ImmutableSet.<Method>builder()
@@ -514,10 +497,10 @@ public class PipelineOptionsFactory {
       throw new ExceptionInInitializerError(e);
     }
 
-    CLASS_LOADER = findClassLoader();
+    CLASS_LOADER = ReflectHelpers.findClassLoader();
 
     Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
-        Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
     pipelineRunnerRegistrars.addAll(
         Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
     // Store the list of all available pipeline runners.
@@ -579,7 +562,7 @@ public class PipelineOptionsFactory {
   private static void initializeRegistry() {
     register(PipelineOptions.class);
     Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
-        Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
     pipelineOptionsRegistrars.addAll(
         Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
     for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
@@ -1390,15 +1373,6 @@ public class PipelineOptionsFactory {
     }
   }
 
-  /** A {@link Comparator} that uses the object's classes canonical name to compare them. */
-  private static class ObjectsClassComparator implements Comparator<Object> {
-    static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator();
-    @Override
-    public int compare(Object o1, Object o2) {
-      return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName());
-    }
-  }
-
   /** A {@link Comparator} that uses the generic method signature to sort them. */
   private static class MethodComparator implements Comparator<Method> {
     static final MethodComparator INSTANCE = new MethodComparator();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index ede1507..77f5128 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -48,7 +48,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
     checkNotNull(options);
 
     // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(gcsOptions);
+    IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
 
     @SuppressWarnings("unchecked")
     PipelineRunner<? extends PipelineResult> result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index f1bf09d..493d4cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -152,7 +152,7 @@ public class TestPipeline extends Pipeline {
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
-      IOChannelUtils.registerStandardIOFactories(options);
+      IOChannelUtils.registerIOFactoriesAllowOverride(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException("Unable to instantiate test options from system property "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index 13591a3..dd81a34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,14 +58,7 @@ public class FileIOChannelFactory implements IOChannelFactory {
    /**
    * Create a {@link FileIOChannelFactory} with the given {@link PipelineOptions}.
    */
-  public static FileIOChannelFactory fromOptions(PipelineOptions options) {
-    return create();
-  }
-
-  /**
-   * Create a {@link FileIOChannelFactory}.
-   */
-  public static FileIOChannelFactory create() {
+  public static FileIOChannelFactory fromOptions(@Nullable PipelineOptions options) {
     return new FileIOChannelFactory();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
index 93752a4..7776b13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactoryRegistrar.java
@@ -22,7 +22,7 @@ import java.util.ServiceLoader;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * A registrar that creates {@link IOChannelFactory} from {@link PipelineOptions}.
+ * A registrar that creates {@link IOChannelFactory} instances from {@link PipelineOptions}.
  *
  * <p>{@link IOChannelFactory} creators have the ability to provide a registrar by creating
  * a {@link ServiceLoader} entry and a concrete implementation of this interface.
@@ -32,12 +32,17 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public interface IOChannelFactoryRegistrar {
   /**
-   * Create a {@link IOChannelFactory} with the given {@link PipelineOptions}.
+   * Create a {@link IOChannelFactory} from the given {@link PipelineOptions}.
    */
   IOChannelFactory fromOptions(PipelineOptions options);
 
   /**
-   * Get the scheme.
+   * Get the URI scheme which defines the namespace of the IOChannelFactoryRegistrar.
+   *
+   * <p>The scheme is required to be unique among all
+   * {@link IOChannelFactoryRegistrar IOChannelFactoryRegistrars}.
+   *
+   * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
    */
   String getScheme();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index d221fa9..d60ee97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -17,19 +17,33 @@
  */
 package org.apache.beam.sdk.util;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 import java.text.DecimalFormat;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * Provides utilities for creating read and write channels.
@@ -42,6 +56,8 @@ public class IOChannelUtils {
   // Pattern that matches shard placeholders within a shard template.
   private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
 
+  private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader();
+
   /**
    * Associates a scheme with an {@link IOChannelFactory}.
    *
@@ -50,18 +66,123 @@ public class IOChannelUtils {
    *
    * <p>For example, when reading from "gs://bucket/path", the scheme "gs" is
    * used to lookup the appropriate factory.
+   *
+   * <p>{@link PipelineOptions} are required to provide dependencies and
+   * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+   *
+   * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories}
+   * for the same scheme are detected.
    */
-  public static void setIOFactory(String scheme, IOChannelFactory factory) {
+  @VisibleForTesting
+  public static void setIOFactoryInternal(
+      String scheme,
+      IOChannelFactory factory,
+      boolean override) {
+    if (!override && FACTORY_MAP.containsKey(scheme)) {
+      throw new IllegalStateException(String.format(
+          "Failed to register IOChannelFactory: %s. "
+              + "Scheme: [%s] is already registered with %s, and override is not allowed.",
+          FACTORY_MAP.get(scheme).getClass(),
+          scheme,
+          factory.getClass()));
+    }
     FACTORY_MAP.put(scheme, factory);
   }
 
   /**
-   * Registers standard factories globally. This requires {@link PipelineOptions}
-   * to provide, e.g., credentials for GCS.
+   * Deregisters the scheme and the associated {@link IOChannelFactory}.
+   */
+  @VisibleForTesting
+  static void deregisterScheme(String scheme) {
+    FACTORY_MAP.remove(scheme);
+  }
+
+  /**
+   * Registers standard factories globally.
+   *
+   * <p>{@link PipelineOptions} are required to provide dependencies and
+   * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+   *
+   * @deprecated use {@link #registerIOFactories}.
    */
+  @Deprecated
   public static void registerStandardIOFactories(PipelineOptions options) {
-    setIOFactory("gs", GcsIOChannelFactory.fromOptions(options));
-    setIOFactory("file", FileIOChannelFactory.fromOptions(options));
+    registerIOFactoriesAllowOverride(options);
+  }
+
+  /**
+   * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}.
+   *
+   * <p>{@link PipelineOptions} are required to provide dependencies and
+   * pipeline level configuration to the individual {@link IOChannelFactory IOChannelFactories}.
+   *
+   * <p>Multiple {@link IOChannelFactory IOChannelFactories} for the same scheme are not allowed.
+   *
+   * @throws IllegalStateException if multiple {@link IOChannelFactory IOChannelFactories}
+   * for the same scheme are detected.
+   */
+  public static void registerIOFactories(PipelineOptions options) {
+    registerIOFactoriesInternal(options, false /* override */);
+  }
+
+  /**
+   * Registers all {@link IOChannelFactory IOChannelFactories} from {@link ServiceLoader}.
+   *
+   * <p>This requires {@link PipelineOptions} to provide, e.g., credentials for GCS.
+   *
+   * <p>Override existing schemes is allowed.
+   *
+   * @deprecated This is currently to provide different configurations for tests and
+   * is still public for IOChannelFactory redesign purposes.
+   */
+  @Deprecated
+  @VisibleForTesting
+  public static void registerIOFactoriesAllowOverride(PipelineOptions options) {
+    registerIOFactoriesInternal(options, true /* override */);
+  }
+
+  private static void registerIOFactoriesInternal(
+      PipelineOptions options, boolean override) {
+    Set<IOChannelFactoryRegistrar> registrars =
+        Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
+    registrars.addAll(Lists.newArrayList(
+        ServiceLoader.load(IOChannelFactoryRegistrar.class, CLASS_LOADER)));
+
+    checkDuplicateScheme(registrars);
+
+    for (IOChannelFactoryRegistrar registrar : registrars) {
+      setIOFactoryInternal(
+          registrar.getScheme(),
+          registrar.fromOptions(options),
+          override);
+    }
+  }
+
+  @VisibleForTesting
+  static void checkDuplicateScheme(Set<IOChannelFactoryRegistrar> registrars) {
+    Multimap<String, IOChannelFactoryRegistrar> registrarsBySchemes =
+        TreeMultimap.create(Ordering.<String>natural(), Ordering.arbitrary());
+
+    for (IOChannelFactoryRegistrar registrar : registrars) {
+      registrarsBySchemes.put(registrar.getScheme(), registrar);
+    }
+    for (Entry<String, Collection<IOChannelFactoryRegistrar>> entry
+        : registrarsBySchemes.asMap().entrySet()) {
+      if (entry.getValue().size() > 1) {
+        String conflictingRegistrars = Joiner.on(", ").join(
+            FluentIterable.from(entry.getValue())
+                .transform(new Function<IOChannelFactoryRegistrar, String>() {
+                  @Override
+                  public String apply(@Nonnull IOChannelFactoryRegistrar input) {
+                    return input.getClass().getName();
+                  }})
+                .toSortedList(Ordering.<String>natural()));
+        throw new IllegalStateException(String.format(
+            "Scheme: [%s] has conflicting registrars: [%s]",
+            entry.getKey(),
+            conflictingRegistrars));
+      }
+    }
   }
 
   /**
@@ -174,7 +295,7 @@ public class IOChannelUtils {
     Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
 
     if (!matcher.matches()) {
-      return FileIOChannelFactory.create();
+      return FileIOChannelFactory.fromOptions(null);
     }
 
     String scheme = matcher.group("scheme");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 2b08fee..637e8e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -34,9 +34,12 @@ import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.lang.reflect.WildcardType;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Queue;
+import java.util.ServiceLoader;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * Utilities for working with with {@link Class Classes} and {@link Method Methods}.
@@ -167,6 +170,15 @@ public class ReflectHelpers {
     }
   };
 
+  /** A {@link Comparator} that uses the object's classes canonical name to compare them. */
+  public static class ObjectsClassComparator implements Comparator<Object> {
+    public static final ObjectsClassComparator INSTANCE = new ObjectsClassComparator();
+    @Override
+    public int compare(Object o1, Object o2) {
+      return o1.getClass().getCanonicalName().compareTo(o2.getClass().getCanonicalName());
+    }
+  }
+
   /**
    * Returns all the methods visible from the provided interfaces.
    *
@@ -203,4 +215,21 @@ public class ReflectHelpers {
     }
     return builder.build();
   }
+
+  /**
+   * Finds the appropriate {@code ClassLoader} to be used by the
+   * {@link ServiceLoader#load} call, which by default would use the context
+   * {@code ClassLoader}, which can be null. The fallback is as follows: context
+   * ClassLoader, class ClassLoader and finaly the system ClassLoader.
+   */
+  public static ClassLoader findClassLoader() {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = IOChannelUtils.class.getClassLoader();
+    }
+    if (classLoader == null) {
+      classLoader = ClassLoader.getSystemClassLoader();
+    }
+    return classLoader;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 1a07177..41a630f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -79,7 +79,7 @@ public class AvroIOTest {
 
   @BeforeClass
   public static void setupClass() {
-    IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+    IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 5208910..dde5d02 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -462,7 +462,7 @@ public class FileBasedSourceTest {
                 new File(parent, "file1").getPath(),
                 new File(parent, "file2").getPath(),
                 new File(parent, "file3").getPath()));
-    IOChannelUtils.setIOFactory("mocked", mockIOFactory);
+    IOChannelUtils.setIOFactoryInternal("mocked", mockIOFactory, true /* override */);
 
     List<String> data2 = createStringDataset(3, 50);
     createFileWithData("file2", data2);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index dc71693..d3a5d5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -175,7 +175,7 @@ public class TextIOTest {
 
   @BeforeClass
   public static void setupClass() throws IOException {
-    IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
+    IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
     tempFolder = Files.createTempDirectory("TextIOTest");
     // empty files
     emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 0a2324f..7ff4a92 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -1461,40 +1461,6 @@ public class PipelineOptionsFactoryTest {
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
 
-  @Test
-  public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException {
-    final ClassLoader[] classLoader = new ClassLoader[1];
-    Thread thread = new Thread(new Runnable() {
-
-      @Override
-      public void run() {
-        classLoader[0] = PipelineOptionsFactory.findClassLoader();
-      }
-    });
-    thread.setContextClassLoader(null);
-    thread.start();
-    thread.join();
-    assertEquals(PipelineOptionsFactory.class.getClassLoader(), classLoader[0]);
-  }
-
-  @Test
-  public void testFindProperClassLoaderIfContextClassLoaderIsAvailable()
-      throws InterruptedException {
-    final ClassLoader[] classLoader = new ClassLoader[1];
-    Thread thread = new Thread(new Runnable() {
-
-      @Override
-      public void run() {
-        classLoader[0] = PipelineOptionsFactory.findClassLoader();
-      }
-    });
-    ClassLoader cl = new ClassLoader() {};
-    thread.setContextClassLoader(cl);
-    thread.start();
-    thread.join();
-    assertEquals(cl, classLoader[0]);
-  }
-
   private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> {
     public static PipelineRunner<PipelineResult> fromOptions(PipelineOptions options) {
       return new RegisteredTestRunner();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
index 4600d81..f8f53e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrarTest.java
@@ -33,8 +33,8 @@ public class FileIOChannelFactoryRegistrarTest {
 
   @Test
   public void testServiceLoader() {
-    for (IOChannelFactoryRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+    for (IOChannelFactoryRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
       if (registrar instanceof FileIOChannelFactoryRegistrar) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
index e27a043..38be65a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
@@ -46,7 +46,7 @@ import org.junit.runners.JUnit4;
 public class FileIOChannelFactoryTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private FileIOChannelFactory factory = FileIOChannelFactory.create();
+  private FileIOChannelFactory factory = FileIOChannelFactory.fromOptions(null);
 
   private void testCreate(Path path) throws Exception {
     String expected = "my test string";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
index 32bd4fc..a29dd45 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -33,8 +33,8 @@ public class GcsIOChannelFactoryRegistrarTest {
 
   @Test
   public void testServiceLoader() {
-    for (IOChannelFactoryRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+    for (IOChannelFactoryRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
       if (registrar instanceof GcsIOChannelFactoryRegistrar) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
index d92d3cd..6dfa4c7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
@@ -19,15 +19,19 @@ package org.apache.beam.sdk.util;
 
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,6 +44,9 @@ public class IOChannelUtilsTest {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void testShardFormatExpansion() {
     assertEquals("output-001-of-123.txt",
@@ -106,4 +113,36 @@ public class IOChannelUtilsTest {
     assertEquals(expected,
         IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "aa", "bb", "cc"));
   }
+
+  @Test
+  public void testRegisterIOFactoriesAllowOverride() throws Exception {
+    IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create());
+    IOChannelUtils.registerIOFactoriesAllowOverride(PipelineOptionsFactory.create());
+    assertNotNull(IOChannelUtils.getFactory("gs"));
+    assertNotNull(IOChannelUtils.getFactory("file"));
+  }
+
+  @Test
+  public void testRegisterIOFactories() throws Exception {
+    IOChannelUtils.deregisterScheme("gs");
+    IOChannelUtils.deregisterScheme("file");
+
+    IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create());
+    assertNotNull(IOChannelUtils.getFactory("gs"));
+    assertNotNull(IOChannelUtils.getFactory("file"));
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Failed to register IOChannelFactory");
+    thrown.expectMessage("override is not allowed");
+    IOChannelUtils.registerIOFactories(PipelineOptionsFactory.create());
+  }
+
+  @Test
+  public void testCheckDuplicateScheme() throws Exception {
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Scheme: [file] has conflicting registrars");
+    IOChannelUtils.checkDuplicateScheme(
+        Sets.<IOChannelFactoryRegistrar>newHashSet(
+            new FileIOChannelFactoryRegistrar(),
+            new FileIOChannelFactoryRegistrar()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
index 8a1708c..5fae25f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/ReflectHelpersTest.java
@@ -148,4 +148,37 @@ public class ReflectHelpersTest {
             Options.class.getMethod("getObject").getAnnotations()[0]));
   }
 
+  @Test
+  public void testFindProperClassLoaderIfContextClassLoaderIsNull() throws InterruptedException {
+    final ClassLoader[] classLoader = new ClassLoader[1];
+    Thread thread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        classLoader[0] = ReflectHelpers.findClassLoader();
+      }
+    });
+    thread.setContextClassLoader(null);
+    thread.start();
+    thread.join();
+    assertEquals(ReflectHelpers.class.getClassLoader(), classLoader[0]);
+  }
+
+  @Test
+  public void testFindProperClassLoaderIfContextClassLoaderIsAvailable()
+      throws InterruptedException {
+    final ClassLoader[] classLoader = new ClassLoader[1];
+    Thread thread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        classLoader[0] = ReflectHelpers.findClassLoader();
+      }
+    });
+    ClassLoader cl = new ClassLoader() {};
+    thread.setContextClassLoader(cl);
+    thread.start();
+    thread.join();
+    assertEquals(cl, classLoader[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 51a69a2..40965e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation("mock://tempLocation");
 
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
@@ -1501,7 +1501,7 @@ public class BigQueryIOTest implements Serializable {
         eq(destinationTable.getDatasetId()),
         eq(destinationTable.getTableId())))
         .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
@@ -1584,7 +1584,7 @@ public class BigQueryIOTest implements Serializable {
         eq(destinationTable.getDatasetId()),
         eq(destinationTable.getTableId())))
         .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))