You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:38 UTC

[11/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
index 6c9a4cb..d8ed121 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
@@ -20,35 +20,25 @@ package org.apache.beam.runners.core;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.ServiceLoader;
+import java.util.Map;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsMapContaining;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -56,25 +46,27 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link BoundedSourceRunner}. */
 @RunWith(JUnit4.class)
 public class BoundedSourceRunnerTest {
-
-  public static final String URN = "urn:org.apache.beam:source:java:0.1";
-
   @Test
   public void testRunReadLoopWithMultipleSources() throws Exception {
-    List<WindowedValue<Long>> out1Values = new ArrayList<>();
+    List<WindowedValue<Long>> out1ValuesA = new ArrayList<>();
+    List<WindowedValue<Long>> out1ValuesB = new ArrayList<>();
     List<WindowedValue<Long>> out2Values = new ArrayList<>();
-    Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
-        ImmutableList.of(out1Values::add, out2Values::add);
+    Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of(
+        "out1", ImmutableList.of(out1ValuesA::add, out1ValuesB::add),
+        "out2", ImmutableList.of(out2Values::add));
 
-    BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
+    BoundedSourceRunner<BoundedSource<Long>, Long> runner =
+        new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
         RunnerApi.FunctionSpec.getDefaultInstance(),
-        consumers);
+        outputMap);
 
     runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
     runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1)));
 
-    assertThat(out1Values,
+    assertThat(out1ValuesA,
+        contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
+    assertThat(out1ValuesB,
         contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
     assertThat(out2Values,
         contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
@@ -82,106 +74,40 @@ public class BoundedSourceRunnerTest {
 
   @Test
   public void testRunReadLoopWithEmptySource() throws Exception {
-    List<WindowedValue<Long>> outValues = new ArrayList<>();
-    Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
-        ImmutableList.of(outValues::add);
+    List<WindowedValue<Long>> out1Values = new ArrayList<>();
+    Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of(
+        "out1", ImmutableList.of(out1Values::add));
 
-    BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
+    BoundedSourceRunner<BoundedSource<Long>, Long> runner =
+        new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
         RunnerApi.FunctionSpec.getDefaultInstance(),
-        consumers);
+        outputMap);
 
     runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
 
-    assertThat(outValues, empty());
+    assertThat(out1Values, empty());
   }
 
   @Test
   public void testStart() throws Exception {
     List<WindowedValue<Long>> outValues = new ArrayList<>();
-    Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
-        ImmutableList.of(outValues::add);
+    Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of(
+        "out", ImmutableList.of(outValues::add));
 
     ByteString encodedSource =
         ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)));
 
-    BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
+    BoundedSourceRunner<BoundedSource<Long>, Long> runner =
+        new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
-        RunnerApi.FunctionSpec.newBuilder().setParameter(
+            RunnerApi.FunctionSpec.newBuilder().setParameter(
             Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
-        consumers);
+        outputMap);
 
     runner.start();
 
     assertThat(outValues,
         contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
   }
-
-  @Test
-  public void testCreatingAndProcessingSourceFromFactory() throws Exception {
-    List<WindowedValue<String>> outputValues = new ArrayList<>();
-
-    Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
-    consumers.put("outputPC",
-        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) outputValues::add);
-    List<ThrowingRunnable> startFunctions = new ArrayList<>();
-    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
-    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
-        .setUrn("urn:org.apache.beam:source:java:0.1")
-        .setParameter(Any.pack(BytesValue.newBuilder()
-            .setValue(ByteString.copyFrom(
-                SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
-            .build()))
-        .build();
-
-    RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
-        .setSpec(functionSpec)
-        .putInputs("input", "inputPC")
-        .putOutputs("output", "outputPC")
-        .build();
-
-    new BoundedSourceRunner.Factory<>().createRunnerForPTransform(
-        PipelineOptionsFactory.create(),
-        null /* beamFnDataClient */,
-        "pTransformId",
-        pTransform,
-        Suppliers.ofInstance("57L")::get,
-        ImmutableMap.of(),
-        ImmutableMap.of(),
-        consumers,
-        startFunctions::add,
-        finishFunctions::add);
-
-    // This is testing a deprecated way of running sources and should be removed
-    // once all source definitions are instead propagated along the input edge.
-    Iterables.getOnlyElement(startFunctions).run();
-    assertThat(outputValues, contains(
-        valueInGlobalWindow(0L),
-        valueInGlobalWindow(1L),
-        valueInGlobalWindow(2L)));
-    outputValues.clear();
-
-    // Check that when passing a source along as an input, the source is processed.
-    assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
-    Iterables.getOnlyElement(consumers.get("inputPC")).accept(
-        valueInGlobalWindow(CountingSource.upTo(2)));
-    assertThat(outputValues, contains(
-        valueInGlobalWindow(0L),
-        valueInGlobalWindow(1L)));
-
-    assertThat(finishFunctions, Matchers.empty());
-  }
-
-  @Test
-  public void testRegistration() {
-    for (Registrar registrar :
-        ServiceLoader.load(Registrar.class)) {
-      if (registrar instanceof BoundedSourceRunner.Registrar) {
-        assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
-        return;
-      }
-    }
-    fail("Expected registrar not found.");
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
deleted file mode 100644
index c4df77a..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow;
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.Message;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
-import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
-public class FnApiDoFnRunnerTest {
-
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private static final Coder<WindowedValue<String>> STRING_CODER =
-      WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
-  private static final String STRING_CODER_SPEC_ID = "999L";
-  private static final RunnerApi.Coder STRING_CODER_SPEC;
-
-  static {
-    try {
-      STRING_CODER_SPEC = RunnerApi.Coder.newBuilder()
-          .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
-              .setSpec(RunnerApi.FunctionSpec.newBuilder()
-                  .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER))))
-                      .build())))
-              .build())
-          .build();
-    } catch (IOException e) {
-      throw new ExceptionInInitializerError(e);
-    }
-  }
-
-  private static class TestDoFn extends DoFn<String, String> {
-    private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
-    private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
-
-    private BoundedWindow window;
-
-    @ProcessElement
-    public void processElement(ProcessContext context, BoundedWindow window) {
-      context.output("MainOutput" + context.element());
-      context.output(additionalOutput, "AdditionalOutput" + context.element());
-      this.window = window;
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext context) {
-      if (window != null) {
-        context.output("FinishBundle", window.maxTimestamp(), window);
-        window = null;
-      }
-    }
-  }
-
-  /**
-   * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs
-   * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs
-   * are directed to the correct consumers.
-   */
-  @Test
-  public void testCreatingAndProcessingDoFn() throws Exception {
-    Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
-    String pTransformId = "pTransformId";
-    String mainOutputId = "101";
-    String additionalOutputId = "102";
-
-    DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
-        new TestDoFn(),
-        WindowingStrategy.globalDefault(),
-        ImmutableList.of(),
-        StringUtf8Coder.of(),
-        Long.parseLong(mainOutputId),
-        ImmutableMap.of(
-            Long.parseLong(mainOutputId), TestDoFn.mainOutput,
-            Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
-    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
-        .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
-        .setParameter(Any.pack(BytesValue.newBuilder()
-            .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
-            .build()))
-        .build();
-    RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
-        .setSpec(functionSpec)
-        .putInputs("inputA", "inputATarget")
-        .putInputs("inputB", "inputBTarget")
-        .putOutputs(mainOutputId, "mainOutputTarget")
-        .putOutputs(additionalOutputId, "additionalOutputTarget")
-        .build();
-
-    List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
-    List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
-    Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
-    consumers.put("mainOutputTarget",
-        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) mainOutputValues::add);
-    consumers.put("additionalOutputTarget",
-        (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) additionalOutputValues::add);
-    List<ThrowingRunnable> startFunctions = new ArrayList<>();
-    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
-    new FnApiDoFnRunner.Factory<>().createRunnerForPTransform(
-        PipelineOptionsFactory.create(),
-        null /* beamFnDataClient */,
-        pTransformId,
-        pTransform,
-        Suppliers.ofInstance("57L")::get,
-        ImmutableMap.of(),
-        ImmutableMap.of(),
-        consumers,
-        startFunctions::add,
-        finishFunctions::add);
-
-    Iterables.getOnlyElement(startFunctions).run();
-    mainOutputValues.clear();
-
-    assertThat(consumers.keySet(), containsInAnyOrder(
-        "inputATarget", "inputBTarget", "mainOutputTarget", "additionalOutputTarget"));
-
-    Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A1"));
-    Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A2"));
-    Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("B"));
-    assertThat(mainOutputValues, contains(
-        valueInGlobalWindow("MainOutputA1"),
-        valueInGlobalWindow("MainOutputA2"),
-        valueInGlobalWindow("MainOutputB")));
-    assertThat(additionalOutputValues, contains(
-        valueInGlobalWindow("AdditionalOutputA1"),
-        valueInGlobalWindow("AdditionalOutputA2"),
-        valueInGlobalWindow("AdditionalOutputB")));
-    mainOutputValues.clear();
-    additionalOutputValues.clear();
-
-    Iterables.getOnlyElement(finishFunctions).run();
-    assertThat(
-        mainOutputValues,
-        contains(
-            timestampedValueInGlobalWindow("FinishBundle", GlobalWindow.INSTANCE.maxTimestamp())));
-    mainOutputValues.clear();
-  }
-
-  @Test
-  public void testRegistration() {
-    for (Registrar registrar :
-        ServiceLoader.load(Registrar.class)) {
-      if (registrar instanceof FnApiDoFnRunner.Registrar) {
-        assertThat(registrar.getPTransformRunnerFactories(),
-            IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN));
-        return;
-      }
-    }
-    fail("Expected registrar not found.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml
deleted file mode 100644
index 8da9448..0000000
--- a/sdks/java/io/amqp/pom.xml
+++ /dev/null
@@ -1,100 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-sdks-java-io-amqp</artifactId>
-  <name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name>
-  <description>IO to read and write using AMQP 1.0 protocol (http://www.amqp.org).</description>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.qpid</groupId>
-      <artifactId>proton-j</artifactId>
-      <version>0.13.1</version>
-    </dependency>
-
-    <!-- compile dependencies -->
-    <dependency>
-      <groupId>com.google.auto.value</groupId>
-      <artifactId>auto-value</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.auto.service</groupId>
-      <artifactId>auto-service</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <!-- test dependencies -->
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
deleted file mode 100644
index 1f307b2..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Joiner;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.Tracker;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library.
- *
- * <p>It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the
- * Apache Beam JmsIO.
- *
- * <h3>Binding AMQP and receive messages</h3>
- *
- * <p>The {@link AmqpIO} {@link Read} can bind a AMQP listener endpoint and receive messages. It can
- * also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ).
- *
- * <p>{@link AmqpIO} {@link Read} returns an unbounded {@link PCollection} of {@link Message}
- * containing the received messages.
- *
- * <p>To configure a AMQP source, you have to provide a list of addresses where it will receive
- * messages. An address has the following form: {@code
- * [amqp[s]://][user[:password]@]domain[/[name]]} where {@code domain} can be one of {@code
- * host | host:port | ip | ip:port | name}. NB: the {@code ~} character allows to bind a AMQP
- * listener instead of connecting to a remote broker. For instance {@code amqp://~0.0.0.0:1234}
- * will bind a AMQP listener on any network interface on the 1234 port number.
- *
- * <p>The following example illustrates how to configure a AMQP source:
- *
- * <pre>{@code
- *
- *  pipeline.apply(AmqpIO.read()
- *    .withAddresses(Collections.singletonList("amqp://host:1234")))
- *
- * }</pre>
- *
- * <h3>Sending messages to a AMQP endpoint</h3>
- *
- * <p>{@link AmqpIO} provides a sink to send {@link PCollection} elements as messages.
- *
- * <p>As for the {@link Read}, {@link AmqpIO} {@link Write} requires a list of addresses where to
- * send messages. The following example illustrates how to configure the {@link AmqpIO}
- * {@link Write}:
- *
- * <pre>{@code
- *
- *  pipeline
- *    .apply(...) // provide PCollection<Message>
- *    .apply(AmqpIO.write());
- *
- * }</pre>
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class AmqpIO {
-
-  public static Read read() {
-    return new AutoValue_AmqpIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
-  }
-
-  public static Write write() {
-    return new AutoValue_AmqpIO_Write();
-  }
-
-  private AmqpIO() {
-  }
-
-  /**
-   * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol.
-   */
-  @AutoValue
-  public abstract static class Read extends PTransform<PBegin, PCollection<Message>> {
-
-    @Nullable abstract List<String> addresses();
-    abstract long maxNumRecords();
-    @Nullable abstract Duration maxReadTime();
-
-    abstract Builder builder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setAddresses(List<String> addresses);
-      abstract Builder setMaxNumRecords(long maxNumRecords);
-      abstract Builder setMaxReadTime(Duration maxReadTime);
-      abstract Read build();
-    }
-
-    /**
-     * Define the AMQP addresses where to receive messages.
-     */
-    public Read withAddresses(List<String> addresses) {
-      checkArgument(addresses != null, "AmqpIO.read().withAddresses(addresses) called with null"
-          + " addresses");
-      checkArgument(!addresses.isEmpty(), "AmqpIO.read().withAddresses(addresses) called with "
-          + "empty addresses list");
-      return builder().setAddresses(addresses).build();
-    }
-
-    /**
-     * Define the max number of records received by the {@link Read}.
-     * When the max number of records is lower than {@code Long.MAX_VALUE}, the {@link Read} will
-     * provide a bounded {@link PCollection}.
-     */
-    public Read withMaxNumRecords(long maxNumRecords) {
-      checkArgument(maxReadTime() == null,
-          "maxNumRecord and maxReadTime are exclusive");
-      return builder().setMaxNumRecords(maxNumRecords).build();
-    }
-
-    /**
-     * Define the max read time (duration) while the {@link Read} will receive messages.
-     * When this max read time is not null, the {@link Read} will provide a bounded
-     * {@link PCollection}.
-     */
-    public Read withMaxReadTime(Duration maxReadTime) {
-      checkArgument(maxNumRecords() == Long.MAX_VALUE,
-          "maxNumRecord and maxReadTime are exclusive");
-      return builder().setMaxReadTime(maxReadTime).build();
-    }
-
-    @Override
-    public void validate(PipelineOptions pipelineOptions) {
-      checkState(addresses() != null, "AmqIO.read() requires addresses list to be set via "
-          + "withAddresses(addresses)");
-      checkState(!addresses().isEmpty(), "AmqIO.read() requires a non-empty addresses list to be"
-          + " set via withAddresses(addresses)");
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses())));
-    }
-
-    @Override
-    public PCollection<Message> expand(PBegin input) {
-      org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded =
-          org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this));
-
-      PTransform<PBegin, PCollection<Message>> transform = unbounded;
-
-      if (maxNumRecords() != Long.MAX_VALUE) {
-        transform = unbounded.withMaxNumRecords(maxNumRecords());
-      } else if (maxReadTime() != null) {
-        transform = unbounded.withMaxReadTime(maxReadTime());
-      }
-
-      return input.getPipeline().apply(transform);
-    }
-
-  }
-
-  private static class AmqpCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
-
-    private transient Messenger messenger;
-    private transient List<Tracker> trackers = new ArrayList<>();
-
-    public AmqpCheckpointMark() {
-    }
-
-    @Override
-    public void finalizeCheckpoint() {
-      for (Tracker tracker : trackers) {
-        // flag as not cumulative
-        messenger.accept(tracker, 0);
-      }
-      trackers.clear();
-    }
-
-    // set an empty list to messages when deserialize
-    private void readObject(java.io.ObjectInputStream stream)
-        throws java.io.IOException, ClassNotFoundException {
-      trackers = new ArrayList<>();
-    }
-
-  }
-
-  private static class UnboundedAmqpSource
-      extends UnboundedSource<Message, AmqpCheckpointMark> {
-
-    private final Read spec;
-
-    public UnboundedAmqpSource(Read spec) {
-      this.spec = spec;
-    }
-
-    @Override
-    public List<UnboundedAmqpSource> split(int desiredNumSplits,
-                                                           PipelineOptions pipelineOptions) {
-      // amqp is a queue system, so, it's possible to have multiple concurrent sources, even if
-      // they bind the listener
-      List<UnboundedAmqpSource> sources = new ArrayList<>();
-      for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) {
-        sources.add(new UnboundedAmqpSource(spec));
-      }
-      return sources;
-    }
-
-    @Override
-    public UnboundedReader<Message> createReader(PipelineOptions pipelineOptions,
-                                                AmqpCheckpointMark checkpointMark) {
-      return new UnboundedAmqpReader(this, checkpointMark);
-    }
-
-    @Override
-    public Coder<Message> getDefaultOutputCoder() {
-      return new AmqpMessageCoder();
-    }
-
-    @Override
-    public Coder<AmqpCheckpointMark> getCheckpointMarkCoder() {
-      return SerializableCoder.of(AmqpCheckpointMark.class);
-    }
-
-    @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-  }
-
-  private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader<Message> {
-
-    private final UnboundedAmqpSource source;
-
-    private Messenger messenger;
-    private Message current;
-    private Instant currentTimestamp;
-    private Instant watermark = new Instant(Long.MIN_VALUE);
-    private AmqpCheckpointMark checkpointMark;
-
-    public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkpointMark) {
-      this.source = source;
-      this.current = null;
-      if (checkpointMark != null) {
-        this.checkpointMark = checkpointMark;
-      } else {
-        this.checkpointMark = new AmqpCheckpointMark();
-      }
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return watermark;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() {
-      if (current == null) {
-        throw new NoSuchElementException();
-      }
-      return currentTimestamp;
-    }
-
-    @Override
-    public Message getCurrent() {
-      if (current == null) {
-        throw new NoSuchElementException();
-      }
-      return current;
-    }
-
-    @Override
-    public UnboundedSource.CheckpointMark getCheckpointMark() {
-      return checkpointMark;
-    }
-
-    @Override
-    public UnboundedAmqpSource getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      Read spec = source.spec;
-      messenger = Messenger.Factory.create();
-      messenger.start();
-      for (String address : spec.addresses()) {
-        messenger.subscribe(address);
-      }
-      checkpointMark.messenger = messenger;
-      return advance();
-    }
-
-    @Override
-    public boolean advance() {
-      messenger.recv();
-      if (messenger.incoming() <= 0) {
-        current = null;
-        return false;
-      }
-      Message message = messenger.get();
-      Tracker tracker = messenger.incomingTracker();
-      checkpointMark.trackers.add(tracker);
-      currentTimestamp = new Instant(message.getCreationTime());
-      watermark = currentTimestamp;
-      current = message;
-      return true;
-    }
-
-    @Override
-    public void close() {
-      if (messenger != null) {
-        messenger.stop();
-      }
-    }
-
-  }
-
-  /**
-   * A {@link PTransform} to send messages using AMQP 1.0 protocol.
-   */
-  @AutoValue
-  public abstract static class Write extends PTransform<PCollection<Message>, PDone> {
-
-    @Override
-    public PDone expand(PCollection<Message> input) {
-      input.apply(ParDo.of(new WriteFn(this)));
-      return PDone.in(input.getPipeline());
-    }
-
-    private static class WriteFn extends DoFn<Message, Void> {
-
-      private final Write spec;
-
-      private transient Messenger messenger;
-
-      public WriteFn(Write spec) {
-        this.spec = spec;
-      }
-
-      @Setup
-      public void setup() throws Exception {
-        messenger = Messenger.Factory.create();
-        messenger.start();
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext processContext) throws Exception {
-        Message message = processContext.element();
-        messenger.put(message);
-        messenger.send();
-      }
-
-      @Teardown
-      public void teardown() throws Exception {
-        if (messenger != null) {
-          messenger.stop();
-        }
-      }
-
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
deleted file mode 100644
index 5a55260..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import com.google.common.io.ByteStreams;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.BufferOverflowException;
-
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.VarInt;
-import org.apache.qpid.proton.message.Message;
-
-/**
- * A coder for AMQP message.
- */
-public class AmqpMessageCoder extends CustomCoder<Message> {
-
-  private static final int[] MESSAGE_SIZES = new int[]{
-      8 * 1024,
-      64 * 1024,
-      1 * 1024 * 1024,
-      64 * 1024 * 1024
-  };
-
-  static AmqpMessageCoder of() {
-    return new AmqpMessageCoder();
-  }
-
-  @Override
-  public void encode(Message value, OutputStream outStream) throws CoderException, IOException {
-    for (int maxMessageSize : MESSAGE_SIZES) {
-      try {
-        encode(value, outStream, maxMessageSize);
-        return;
-      } catch (Exception e) {
-        continue;
-      }
-    }
-    throw new CoderException("Message is larger than the max size supported by the coder");
-  }
-
-  private void encode(Message value, OutputStream outStream, int messageSize) throws
-      IOException, BufferOverflowException {
-    byte[] data = new byte[messageSize];
-    int bytesWritten = value.encode(data, 0, data.length);
-    VarInt.encode(bytesWritten, outStream);
-    outStream.write(data, 0, bytesWritten);
-  }
-
-  @Override
-  public Message decode(InputStream inStream) throws CoderException, IOException {
-    Message message = Message.Factory.create();
-    int bytesToRead = VarInt.decodeInt(inStream);
-    byte[] encodedMessage = new byte[bytesToRead];
-    ByteStreams.readFully(inStream, encodedMessage);
-    message.decode(encodedMessage, 0, encodedMessage.length);
-    return message;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
deleted file mode 100644
index bc3445c..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviderRegistrar;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.qpid.proton.message.Message;
-
-/**
- * A {@link CoderProviderRegistrar} for standard types used with {@link AmqpIO}.
- */
-@AutoService(CoderProviderRegistrar.class)
-public class AmqpMessageCoderProviderRegistrar implements CoderProviderRegistrar {
-
-  @Override
-  public List<CoderProvider> getCoderProviders() {
-    return ImmutableList.of(
-        CoderProviders.forCoder(TypeDescriptor.of(Message.class),
-            AmqpMessageCoder.of()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
deleted file mode 100644
index 091f234..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing using AMQP 1.0 protocol.
- */
-package org.apache.beam.sdk.io.amqp;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
deleted file mode 100644
index c8fe4e8..0000000
--- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tests on {@link AmqpIO}.
- */
-@RunWith(JUnit4.class)
-public class AmqpIOTest {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);
-
-  private int port;
-
-  @Rule public TestPipeline pipeline = TestPipeline.create();
-
-  @Before
-  public void findFreeNetworkPort() throws Exception {
-    LOG.info("Finding free network port");
-    ServerSocket socket = new ServerSocket(0);
-    port = socket.getLocalPort();
-    socket.close();
-  }
-
-  @Test
-  public void testRead() throws Exception {
-    PCollection<Message> output = pipeline.apply(AmqpIO.read()
-        .withMaxNumRecords(100)
-        .withAddresses(Collections.singletonList("amqp://~localhost:" + port)));
-    PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L);
-
-    Thread sender = new Thread() {
-      public void run() {
-        try {
-          Thread.sleep(500);
-          Messenger sender = Messenger.Factory.create();
-          sender.start();
-          for (int i = 0; i < 100; i++) {
-            Message message = Message.Factory.create();
-            message.setAddress("amqp://localhost:" + port);
-            message.setBody(new AmqpValue("Test " + i));
-            sender.put(message);
-            sender.send();
-          }
-          sender.stop();
-        } catch (Exception e) {
-          LOG.error("Sender error", e);
-        }
-      }
-    };
-    try {
-      sender.start();
-      pipeline.run();
-    } finally {
-      sender.join();
-    }
-  }
-
-  @Test
-  public void testWrite() throws Exception {
-    final List<String> received = new ArrayList<>();
-    Thread receiver = new Thread() {
-      @Override
-      public void run() {
-        try {
-          Messenger messenger = Messenger.Factory.create();
-          messenger.start();
-          messenger.subscribe("amqp://~localhost:" + port);
-          while (received.size() < 100) {
-            messenger.recv();
-            while (messenger.incoming() > 0) {
-              Message message = messenger.get();
-              LOG.info("Received: " + message.getBody().toString());
-              received.add(message.getBody().toString());
-            }
-          }
-          messenger.stop();
-        } catch (Exception e) {
-          LOG.error("Receiver error", e);
-        }
-      }
-    };
-    LOG.info("Starting AMQP receiver");
-    receiver.start();
-
-    List<Message> data = new ArrayList<>();
-    for (int i = 0; i < 100; i++) {
-      Message message = Message.Factory.create();
-      message.setBody(new AmqpValue("Test " + i));
-      message.setAddress("amqp://localhost:" + port);
-      message.setSubject("test");
-      data.add(message);
-    }
-    pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
-    LOG.info("Starting pipeline");
-    try {
-      pipeline.run();
-    } finally {
-      LOG.info("Join receiver thread");
-      receiver.join();
-    }
-
-    assertEquals(100, received.size());
-    for (int i = 0; i < 100; i++) {
-      assertTrue(received.contains("AmqpValue{Test " + i + "}"));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
deleted file mode 100644
index 7a8efeb..0000000
--- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.base.Joiner;
-
-import java.util.Collections;
-
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.message.Message;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test on {@link AmqpMessageCoder}.
- */
-@RunWith(JUnit4.class)
-public class AmqpMessageCoderTest {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void encodeDecode() throws Exception {
-    Message message = Message.Factory.create();
-    message.setBody(new AmqpValue("body"));
-    message.setAddress("address");
-    message.setSubject("test");
-    AmqpMessageCoder coder = AmqpMessageCoder.of();
-
-    Message clone = CoderUtils.clone(coder, message);
-
-    assertEquals("AmqpValue{body}", clone.getBody().toString());
-    assertEquals("address", clone.getAddress());
-    assertEquals("test", clone.getSubject());
-  }
-
-  @Test
-  public void encodeDecodeTooMuchLargerMessage() throws Exception {
-    thrown.expect(CoderException.class);
-    Message message = Message.Factory.create();
-    message.setAddress("address");
-    message.setSubject("subject");
-    String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
-    message.setBody(new AmqpValue(body));
-
-    AmqpMessageCoder coder = AmqpMessageCoder.of();
-
-    byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
-  }
-
-  @Test
-  public void encodeDecodeLargeMessage() throws Exception {
-    Message message = Message.Factory.create();
-    message.setAddress("address");
-    message.setSubject("subject");
-    String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
-    message.setBody(new AmqpValue(body));
-
-    AmqpMessageCoder coder = AmqpMessageCoder.of();
-
-    Message clone = CoderUtils.clone(coder, message);
-
-    clone.getBody().toString().equals(message.getBody().toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
index c74477e..8249f57 100644
--- a/sdks/java/io/cassandra/pom.xml
+++ b/sdks/java/io/cassandra/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 32905b7..b6f4ef6 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory;
  *        .withEntity(Person.class));
  * }</pre>
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class CassandraIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index df0d94b..f7525fd 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-parent</artifactId>
-        <version>2.2.0-SNAPSHOT</version>
+        <version>2.1.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 25ab929..387fd22 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -71,7 +71,11 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   Integer getElasticsearchHttpPort();
   void setElasticsearchHttpPort(Integer value);
 
-  /* Cassandra */
+  @Description("Tcp port for elasticsearch server")
+  @Default.Integer(9300)
+  Integer getElasticsearchTcpPort();
+  void setElasticsearchTcpPort(Integer value);
+
   @Description("Host for Cassandra server (host name/ip address)")
   @Default.String("cassandra-host")
   String getCassandraHost();

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index e0a7f21..03632ce 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -137,14 +137,6 @@
       <scope>test</scope>
     </dependency>
 
-    <!-- This optional dependency is used by the test framework. Avoids a warning -->
-    <dependency>
-      <groupId>net.java.dev.jna</groupId>
-      <artifactId>jna</artifactId>
-      <version>4.1.0</version>
-      <scope>test</scope>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-direct-java</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 4d76887..f6ceef2 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -113,7 +113,7 @@ import org.elasticsearch.client.RestClientBuilder;
  * <p>Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()}
  * to specify the size of the write batch in number of documents or in bytes.
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class ElasticsearchIO {
 
   public static Read read() {
@@ -139,7 +139,7 @@ public class ElasticsearchIO {
 
   private static final ObjectMapper mapper = new ObjectMapper();
 
-  static JsonNode parseResponse(Response response) throws IOException {
+  private static JsonNode parseResponse(Response response) throws IOException {
     return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
   }
 
@@ -264,7 +264,7 @@ public class ElasticsearchIO {
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
     }
 
-    RestClient createClient() throws MalformedURLException {
+    private RestClient createClient() throws MalformedURLException {
       HttpHost[] hosts = new HttpHost[getAddresses().size()];
       int i = 0;
       for (String address : getAddresses()) {
@@ -455,7 +455,16 @@ public class ElasticsearchIO {
       while (shards.hasNext()) {
         Map.Entry<String, JsonNode> shardJson = shards.next();
         String shardId = shardJson.getKey();
-        sources.add(new BoundedElasticsearchSource(spec, shardId));
+        JsonNode value = (JsonNode) shardJson.getValue();
+        boolean isPrimaryShard =
+            value
+                .path(0)
+                .path("routing")
+                .path("primary")
+                .asBoolean();
+        if (isPrimaryShard) {
+          sources.add(new BoundedElasticsearchSource(spec, shardId));
+        }
       }
       checkArgument(!sources.isEmpty(), "No primary shard found");
       return sources;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 203963d..b0d161f 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -17,17 +17,19 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import org.apache.http.HttpEntity;
-import org.apache.http.entity.ContentType;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.nio.entity.NStringEntity;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.index.IndexNotFoundException;
 
 /** Test utilities to use with {@link ElasticsearchIO}. */
 class ElasticSearchIOTestUtils {
@@ -39,68 +41,57 @@ class ElasticSearchIOTestUtils {
   }
 
   /** Deletes the given index synchronously. */
-  static void deleteIndex(String index, RestClient restClient) throws IOException {
-    try {
-      restClient.performRequest("DELETE", String.format("/%s", index), new BasicHeader("", ""));
-    } catch (IOException e) {
-      // it is fine to ignore this expression as deleteIndex occurs in @before,
-      // so when the first tests is run, the index does not exist yet
-      if (!e.getMessage().contains("index_not_found_exception")){
-        throw e;
-      }
+  static void deleteIndex(String index, Client client) throws Exception {
+    IndicesAdminClient indices = client.admin().indices();
+    IndicesExistsResponse indicesExistsResponse =
+        indices.exists(new IndicesExistsRequest(index)).get();
+    if (indicesExistsResponse.isExists()) {
+      indices.prepareClose(index).get();
+      indices.delete(Requests.deleteIndexRequest(index)).get();
     }
   }
 
   /** Inserts the given number of test documents into Elasticsearch. */
-  static void insertTestDocuments(String index, String type, long numDocs, RestClient restClient)
-      throws IOException {
+  static void insertTestDocuments(String index, String type, long numDocs, Client client)
+      throws Exception {
+    final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true);
     List<String> data =
         ElasticSearchIOTestUtils.createDocuments(
             numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
-    StringBuilder bulkRequest = new StringBuilder();
     for (String document : data) {
-      bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document));
+      bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document));
     }
-    String endPoint = String.format("/%s/%s/_bulk", index, type);
-    HttpEntity requestBody =
-        new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
-    Response response = restClient.performRequest("POST", endPoint,
-        Collections.singletonMap("refresh", "true"), requestBody,
-        new BasicHeader("", ""));
-    JsonNode searchResult = ElasticsearchIO.parseResponse(response);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors){
+    final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+    if (bulkResponse.hasFailures()) {
       throw new IOException(
-          String.format("Failed to insert test documents in index %s", index));
+          String.format(
+              "Cannot insert test documents in index %s : %s",
+              index, bulkResponse.buildFailureMessage()));
     }
   }
 
   /**
-   * Forces a refresh of the given index to make recently inserted documents available for search.
+   * Forces an upgrade of the given index to make recently inserted documents available for search.
    *
    * @return The number of docs in the index
    */
-  static long refreshIndexAndGetCurrentNumDocs(String index, String type, RestClient restClient)
-      throws IOException {
-    long result = 0;
+  static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) {
     try {
-      String endPoint = String.format("/%s/_refresh", index);
-      restClient.performRequest("POST", endPoint, new BasicHeader("", ""));
-
-      endPoint = String.format("/%s/%s/_search", index, type);
-      Response response = restClient.performRequest("GET", endPoint, new BasicHeader("", ""));
-      JsonNode searchResult = ElasticsearchIO.parseResponse(response);
-      result = searchResult.path("hits").path("total").asLong();
-    } catch (IOException e) {
+      client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet();
+      SearchResponse response =
+          client.prepareSearch(index).setTypes(type).execute().actionGet(5000);
+      return response.getHits().getTotalHits();
       // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
       // we call upgrade before any doc have been written
       // (when there are fewer docs processed than batchSize).
       // In that cases index/type has not been created (created upon first doc insertion)
-      if (!e.getMessage().contains("index_not_found_exception")){
+    } catch (IndexNotFoundException e) {
+    } catch (java.lang.IllegalArgumentException e) {
+      if (!e.getMessage().contains("No search type")) {
         throw e;
       }
     }
-    return result;
+    return 0;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 7c37e87..2d6393a 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
-import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.transport.TransportClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ElasticsearchIOIT {
   private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOIT.class);
-  private static  RestClient restClient;
+  private static TransportClient client;
   private static IOTestPipelineOptions options;
   private static ElasticsearchIO.ConnectionConfiguration readConnectionConfiguration;
   @Rule public TestPipeline pipeline = TestPipeline.create();
@@ -66,16 +66,16 @@ public class ElasticsearchIOIT {
   public static void beforeClass() throws Exception {
     PipelineOptionsFactory.register(IOTestPipelineOptions.class);
     options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+    client = ElasticsearchTestDataSet.getClient(options);
     readConnectionConfiguration =
         ElasticsearchTestDataSet.getConnectionConfiguration(
             options, ElasticsearchTestDataSet.ReadOrWrite.READ);
-    restClient = readConnectionConfiguration.createClient();
   }
 
   @AfterClass
   public static void afterClass() throws Exception {
-    ElasticsearchTestDataSet.deleteIndex(restClient, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
-    restClient.close();
+    ElasticsearchTestDataSet.deleteIndex(client, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
+    client.close();
   }
 
   @Test
@@ -128,8 +128,8 @@ public class ElasticsearchIOIT {
     pipeline.run();
 
     long currentNumDocs =
-        ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
-            ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, restClient);
+        ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+            ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, client);
     assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index b349a29..260af79 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.values.PCollection;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
 import org.hamcrest.CustomMatcher;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -74,10 +74,9 @@ public class ElasticsearchIOTest implements Serializable {
   private static final long BATCH_SIZE_BYTES = 2048L;
 
   private static Node node;
-  private static RestClient restClient;
   private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
 
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+  @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
 
@@ -92,8 +91,8 @@ public class ElasticsearchIOTest implements Serializable {
             .put("cluster.name", "beam")
             .put("http.enabled", "true")
             .put("node.data", "true")
-            .put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
-            .put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
+            .put("path.data", folder.getRoot().getPath())
+            .put("path.home", folder.getRoot().getPath())
             .put("node.name", "beam")
             .put("network.host", ES_IP)
             .put("http.port", esHttpPort)
@@ -101,29 +100,27 @@ public class ElasticsearchIOTest implements Serializable {
             // had problems with some jdk, embedded ES was too slow for bulk insertion,
             // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
             .put("threadpool.bulk.queue_size", 100);
-    node = new Node(settingsBuilder.build());
+    node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
     LOG.info("Elasticsearch node created");
     node.start();
     connectionConfiguration =
       ElasticsearchIO.ConnectionConfiguration.create(
         new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
-    restClient = connectionConfiguration.createClient();
   }
 
   @AfterClass
-  public static void afterClass() throws IOException{
-    restClient.close();
+  public static void afterClass() {
     node.close();
   }
 
   @Before
   public void before() throws Exception {
-    ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient);
+    ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
   }
 
   @Test
   public void testSizes() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
     PipelineOptions options = PipelineOptionsFactory.create();
     ElasticsearchIO.Read read =
         ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
@@ -137,7 +134,7 @@ public class ElasticsearchIOTest implements Serializable {
 
   @Test
   public void testRead() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
 
     PCollection<String> output =
         pipeline.apply(
@@ -153,7 +150,7 @@ public class ElasticsearchIOTest implements Serializable {
 
   @Test
   public void testReadWithQuery() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
 
     String query =
         "{\n"
@@ -188,7 +185,7 @@ public class ElasticsearchIOTest implements Serializable {
     pipeline.run();
 
     long currentNumDocs =
-        ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
+        ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
     assertEquals(NUM_DOCS, currentNumDocs);
 
     QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist");
@@ -261,8 +258,9 @@ public class ElasticsearchIOTest implements Serializable {
       if ((numDocsProcessed % 100) == 0) {
         // force the index to upgrade after inserting for the inserted docs
         // to be searchable immediately
-        long currentNumDocs = ElasticSearchIOTestUtils
-            .refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
+        long currentNumDocs =
+            ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+                ES_INDEX, ES_TYPE, node.client());
         if ((numDocsProcessed % BATCH_SIZE) == 0) {
           /* bundle end */
           assertEquals(
@@ -306,8 +304,8 @@ public class ElasticsearchIOTest implements Serializable {
         // force the index to upgrade after inserting for the inserted docs
         // to be searchable immediately
         long currentNumDocs =
-            ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
-                ES_INDEX, ES_TYPE, restClient);
+            ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+                ES_INDEX, ES_TYPE, node.client());
         if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
           /* bundle end */
           assertThat(
@@ -329,7 +327,7 @@ public class ElasticsearchIOTest implements Serializable {
 
   @Test
   public void testSplit() throws Exception {
-    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+    ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
     PipelineOptions options = PipelineOptionsFactory.create();
     ElasticsearchIO.Read read =
         ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index 2a2dbe9..3a9aae6 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.io.elasticsearch;
 
+import static java.net.InetAddress.getByName;
 
 import java.io.IOException;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 
 /**
  * Manipulates test data used by the {@link ElasticsearchIO}
@@ -49,6 +51,7 @@ public class ElasticsearchTestDataSet {
    * -Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \
    *   -Dexec.args="--elasticsearchServer=1.2.3.4 \
    *  --elasticsearchHttpPort=9200 \
+   *  --elasticsearchTcpPort=9300" \
    *   -Dexec.classpathScope=test
    *   </pre>
    *
@@ -59,20 +62,29 @@ public class ElasticsearchTestDataSet {
     PipelineOptionsFactory.register(IOTestPipelineOptions.class);
     IOTestPipelineOptions options =
         PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
-    createAndPopulateReadIndex(options);
+
+    createAndPopulateIndex(getClient(options), ReadOrWrite.READ);
   }
 
-  private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception {
-    RestClient restClient =  getConnectionConfiguration(options, ReadOrWrite.READ).createClient();
+  private static void createAndPopulateIndex(TransportClient client, ReadOrWrite rOw)
+      throws Exception {
     // automatically creates the index and insert docs
-    try {
-      ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
-    } finally {
-      restClient.close();
-    }
+    ElasticSearchIOTestUtils.insertTestDocuments(
+        (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, client);
+  }
+
+  public static TransportClient getClient(IOTestPipelineOptions options) throws Exception {
+    TransportClient client =
+        TransportClient.builder()
+            .build()
+            .addTransportAddress(
+                new InetSocketTransportAddress(
+                    getByName(options.getElasticsearchServer()),
+                    options.getElasticsearchTcpPort()));
+    return client;
   }
 
-  static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
+  public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
       IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException {
     ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
         ElasticsearchIO.ConnectionConfiguration.create(
@@ -87,9 +99,8 @@ public class ElasticsearchTestDataSet {
     return connectionConfiguration;
   }
 
-  static void deleteIndex(RestClient restClient, ReadOrWrite rOw) throws Exception {
-    ElasticSearchIOTestUtils
-        .deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, restClient);
+  public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client);
   }
 
   /** Enum that tells whether we use the index for reading or for writing. */

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index a1495f2..8b53820 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -93,12 +93,7 @@
 
     <dependency>
       <groupId>com.google.api</groupId>
-      <artifactId>gax-grpc</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.cloud</groupId>
-      <artifactId>google-cloud-core-grpc</artifactId>
+      <artifactId>api-common</artifactId>
     </dependency>
 
     <dependency>
@@ -258,6 +253,11 @@
       <artifactId>proto-google-common-protos</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
     <!--  Test dependencies -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index e46b1d3..4393a63 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -58,7 +57,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c5c2462..edb1e0d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -23,7 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
@@ -158,16 +158,21 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
     }
     // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
     // We must first use reflection to figure out what the type parameter is.
-    TypeDescriptor<?> superDescriptor =
-        TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
-    if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
-      throw new AssertionError(
-          "Couldn't find the DynamicDestinations superclass of " + this.getClass());
+    for (Type superclass = getClass().getGenericSuperclass();
+        superclass != null;
+        superclass = ((Class) superclass).getGenericSuperclass()) {
+      if (superclass instanceof ParameterizedType) {
+        ParameterizedType parameterized = (ParameterizedType) superclass;
+        if (parameterized.getRawType() == DynamicDestinations.class) {
+          // DestinationT is the second parameter.
+          Type parameter = parameterized.getActualTypeArguments()[1];
+          @SuppressWarnings("unchecked")
+          Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
+          return registry.getCoder(parameterClass);
+        }
+      }
     }
-    TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
-    @SuppressWarnings("unchecked")
-    TypeDescriptor<DestinationT> descriptor =
-        (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
-    return registry.getCoder(descriptor);
+    throw new AssertionError(
+        "Couldn't find the DynamicDestinations superclass of " + this.getClass());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
index 55672ff..90d41a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ShardedKey;
 
 /**
  * Given a write to a specific table, assign that to one of the

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
new file mode 100644
index 0000000..c2b739f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A key and a shard number.
+ */
+class ShardedKey<K> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final K key;
+  private final int shardNumber;
+
+  public static <K> ShardedKey<K> of(K key, int shardNumber) {
+    return new ShardedKey<>(key, shardNumber);
+  }
+
+  ShardedKey(K key, int shardNumber) {
+    this.key = key;
+    this.shardNumber = shardNumber;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public int getShardNumber() {
+    return shardNumber;
+  }
+
+  @Override
+  public String toString() {
+    return "key: " + key + " shard: " + shardNumber;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof ShardedKey)) {
+      return false;
+    }
+    ShardedKey<K> other = (ShardedKey<K>) o;
+    return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, shardNumber);
+  }
+}