You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:38 UTC
[11/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
index 6c9a4cb..d8ed121 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java
@@ -20,35 +20,25 @@ package org.apache.beam.runners.core;
import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.ServiceLoader;
+import java.util.Map;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
-import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsMapContaining;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -56,25 +46,27 @@ import org.junit.runners.JUnit4;
/** Tests for {@link BoundedSourceRunner}. */
@RunWith(JUnit4.class)
public class BoundedSourceRunnerTest {
-
- public static final String URN = "urn:org.apache.beam:source:java:0.1";
-
@Test
public void testRunReadLoopWithMultipleSources() throws Exception {
- List<WindowedValue<Long>> out1Values = new ArrayList<>();
+ List<WindowedValue<Long>> out1ValuesA = new ArrayList<>();
+ List<WindowedValue<Long>> out1ValuesB = new ArrayList<>();
List<WindowedValue<Long>> out2Values = new ArrayList<>();
- Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
- ImmutableList.of(out1Values::add, out2Values::add);
+ Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of(
+ "out1", ImmutableList.of(out1ValuesA::add, out1ValuesB::add),
+ "out2", ImmutableList.of(out2Values::add));
- BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
+ BoundedSourceRunner<BoundedSource<Long>, Long> runner =
+ new BoundedSourceRunner<>(
PipelineOptionsFactory.create(),
RunnerApi.FunctionSpec.getDefaultInstance(),
- consumers);
+ outputMap);
runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2)));
runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1)));
- assertThat(out1Values,
+ assertThat(out1ValuesA,
+ contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
+ assertThat(out1ValuesB,
contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
assertThat(out2Values,
contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L)));
@@ -82,106 +74,40 @@ public class BoundedSourceRunnerTest {
@Test
public void testRunReadLoopWithEmptySource() throws Exception {
- List<WindowedValue<Long>> outValues = new ArrayList<>();
- Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
- ImmutableList.of(outValues::add);
+ List<WindowedValue<Long>> out1Values = new ArrayList<>();
+ Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of(
+ "out1", ImmutableList.of(out1Values::add));
- BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
+ BoundedSourceRunner<BoundedSource<Long>, Long> runner =
+ new BoundedSourceRunner<>(
PipelineOptionsFactory.create(),
RunnerApi.FunctionSpec.getDefaultInstance(),
- consumers);
+ outputMap);
runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0)));
- assertThat(outValues, empty());
+ assertThat(out1Values, empty());
}
@Test
public void testStart() throws Exception {
List<WindowedValue<Long>> outValues = new ArrayList<>();
- Collection<ThrowingConsumer<WindowedValue<Long>>> consumers =
- ImmutableList.of(outValues::add);
+ Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of(
+ "out", ImmutableList.of(outValues::add));
ByteString encodedSource =
ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3)));
- BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
+ BoundedSourceRunner<BoundedSource<Long>, Long> runner =
+ new BoundedSourceRunner<>(
PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.newBuilder().setParameter(
+ RunnerApi.FunctionSpec.newBuilder().setParameter(
Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
- consumers);
+ outputMap);
runner.start();
assertThat(outValues,
contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L)));
}
-
- @Test
- public void testCreatingAndProcessingSourceFromFactory() throws Exception {
- List<WindowedValue<String>> outputValues = new ArrayList<>();
-
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
- consumers.put("outputPC",
- (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) outputValues::add);
- List<ThrowingRunnable> startFunctions = new ArrayList<>();
- List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn("urn:org.apache.beam:source:java:0.1")
- .setParameter(Any.pack(BytesValue.newBuilder()
- .setValue(ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
- .build()))
- .build();
-
- RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
- .setSpec(functionSpec)
- .putInputs("input", "inputPC")
- .putOutputs("output", "outputPC")
- .build();
-
- new BoundedSourceRunner.Factory<>().createRunnerForPTransform(
- PipelineOptionsFactory.create(),
- null /* beamFnDataClient */,
- "pTransformId",
- pTransform,
- Suppliers.ofInstance("57L")::get,
- ImmutableMap.of(),
- ImmutableMap.of(),
- consumers,
- startFunctions::add,
- finishFunctions::add);
-
- // This is testing a deprecated way of running sources and should be removed
- // once all source definitions are instead propagated along the input edge.
- Iterables.getOnlyElement(startFunctions).run();
- assertThat(outputValues, contains(
- valueInGlobalWindow(0L),
- valueInGlobalWindow(1L),
- valueInGlobalWindow(2L)));
- outputValues.clear();
-
- // Check that when passing a source along as an input, the source is processed.
- assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC"));
- Iterables.getOnlyElement(consumers.get("inputPC")).accept(
- valueInGlobalWindow(CountingSource.upTo(2)));
- assertThat(outputValues, contains(
- valueInGlobalWindow(0L),
- valueInGlobalWindow(1L)));
-
- assertThat(finishFunctions, Matchers.empty());
- }
-
- @Test
- public void testRegistration() {
- for (Registrar registrar :
- ServiceLoader.load(Registrar.class)) {
- if (registrar instanceof BoundedSourceRunner.Registrar) {
- assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
- return;
- }
- }
- fail("Expected registrar not found.");
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
deleted file mode 100644
index c4df77a..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow;
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.Message;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
-import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
-public class FnApiDoFnRunnerTest {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Coder<WindowedValue<String>> STRING_CODER =
- WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
- private static final String STRING_CODER_SPEC_ID = "999L";
- private static final RunnerApi.Coder STRING_CODER_SPEC;
-
- static {
- try {
- STRING_CODER_SPEC = RunnerApi.Coder.newBuilder()
- .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
- .setSpec(RunnerApi.FunctionSpec.newBuilder()
- .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER))))
- .build())))
- .build())
- .build();
- } catch (IOException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
- private static class TestDoFn extends DoFn<String, String> {
- private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
- private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
-
- private BoundedWindow window;
-
- @ProcessElement
- public void processElement(ProcessContext context, BoundedWindow window) {
- context.output("MainOutput" + context.element());
- context.output(additionalOutput, "AdditionalOutput" + context.element());
- this.window = window;
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext context) {
- if (window != null) {
- context.output("FinishBundle", window.maxTimestamp(), window);
- window = null;
- }
- }
- }
-
- /**
- * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs
- * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs
- * are directed to the correct consumers.
- */
- @Test
- public void testCreatingAndProcessingDoFn() throws Exception {
- Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
- String pTransformId = "pTransformId";
- String mainOutputId = "101";
- String additionalOutputId = "102";
-
- DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
- new TestDoFn(),
- WindowingStrategy.globalDefault(),
- ImmutableList.of(),
- StringUtf8Coder.of(),
- Long.parseLong(mainOutputId),
- ImmutableMap.of(
- Long.parseLong(mainOutputId), TestDoFn.mainOutput,
- Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
- .setParameter(Any.pack(BytesValue.newBuilder()
- .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
- .build()))
- .build();
- RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
- .setSpec(functionSpec)
- .putInputs("inputA", "inputATarget")
- .putInputs("inputB", "inputBTarget")
- .putOutputs(mainOutputId, "mainOutputTarget")
- .putOutputs(additionalOutputId, "additionalOutputTarget")
- .build();
-
- List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
- List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create();
- consumers.put("mainOutputTarget",
- (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) mainOutputValues::add);
- consumers.put("additionalOutputTarget",
- (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) additionalOutputValues::add);
- List<ThrowingRunnable> startFunctions = new ArrayList<>();
- List<ThrowingRunnable> finishFunctions = new ArrayList<>();
-
- new FnApiDoFnRunner.Factory<>().createRunnerForPTransform(
- PipelineOptionsFactory.create(),
- null /* beamFnDataClient */,
- pTransformId,
- pTransform,
- Suppliers.ofInstance("57L")::get,
- ImmutableMap.of(),
- ImmutableMap.of(),
- consumers,
- startFunctions::add,
- finishFunctions::add);
-
- Iterables.getOnlyElement(startFunctions).run();
- mainOutputValues.clear();
-
- assertThat(consumers.keySet(), containsInAnyOrder(
- "inputATarget", "inputBTarget", "mainOutputTarget", "additionalOutputTarget"));
-
- Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A1"));
- Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A2"));
- Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("B"));
- assertThat(mainOutputValues, contains(
- valueInGlobalWindow("MainOutputA1"),
- valueInGlobalWindow("MainOutputA2"),
- valueInGlobalWindow("MainOutputB")));
- assertThat(additionalOutputValues, contains(
- valueInGlobalWindow("AdditionalOutputA1"),
- valueInGlobalWindow("AdditionalOutputA2"),
- valueInGlobalWindow("AdditionalOutputB")));
- mainOutputValues.clear();
- additionalOutputValues.clear();
-
- Iterables.getOnlyElement(finishFunctions).run();
- assertThat(
- mainOutputValues,
- contains(
- timestampedValueInGlobalWindow("FinishBundle", GlobalWindow.INSTANCE.maxTimestamp())));
- mainOutputValues.clear();
- }
-
- @Test
- public void testRegistration() {
- for (Registrar registrar :
- ServiceLoader.load(Registrar.class)) {
- if (registrar instanceof FnApiDoFnRunner.Registrar) {
- assertThat(registrar.getPTransformRunnerFactories(),
- IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN));
- return;
- }
- }
- fail("Expected registrar not found.");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml
deleted file mode 100644
index 8da9448..0000000
--- a/sdks/java/io/amqp/pom.xml
+++ /dev/null
@@ -1,100 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-sdks-java-io-amqp</artifactId>
- <name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name>
- <description>IO to read and write using AMQP 1.0 protocol (http://www.amqp.org).</description>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-j</artifactId>
- <version>0.13.1</version>
- </dependency>
-
- <!-- compile dependencies -->
- <dependency>
- <groupId>com.google.auto.value</groupId>
- <artifactId>auto-value</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <optional>true</optional>
- </dependency>
-
- <!-- test dependencies -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
deleted file mode 100644
index 1f307b2..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Joiner;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.Tracker;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library.
- *
- * <p>It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the
- * Apache Beam JmsIO.
- *
- * <h3>Binding AMQP and receive messages</h3>
- *
- * <p>The {@link AmqpIO} {@link Read} can bind a AMQP listener endpoint and receive messages. It can
- * also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ).
- *
- * <p>{@link AmqpIO} {@link Read} returns an unbounded {@link PCollection} of {@link Message}
- * containing the received messages.
- *
- * <p>To configure a AMQP source, you have to provide a list of addresses where it will receive
- * messages. An address has the following form: {@code
- * [amqp[s]://][user[:password]@]domain[/[name]]} where {@code domain} can be one of {@code
- * host | host:port | ip | ip:port | name}. NB: the {@code ~} character allows to bind a AMQP
- * listener instead of connecting to a remote broker. For instance {@code amqp://~0.0.0.0:1234}
- * will bind a AMQP listener on any network interface on the 1234 port number.
- *
- * <p>The following example illustrates how to configure a AMQP source:
- *
- * <pre>{@code
- *
- * pipeline.apply(AmqpIO.read()
- * .withAddresses(Collections.singletonList("amqp://host:1234")))
- *
- * }</pre>
- *
- * <h3>Sending messages to a AMQP endpoint</h3>
- *
- * <p>{@link AmqpIO} provides a sink to send {@link PCollection} elements as messages.
- *
- * <p>As for the {@link Read}, {@link AmqpIO} {@link Write} requires a list of addresses where to
- * send messages. The following example illustrates how to configure the {@link AmqpIO}
- * {@link Write}:
- *
- * <pre>{@code
- *
- * pipeline
- * .apply(...) // provide PCollection<Message>
- * .apply(AmqpIO.write());
- *
- * }</pre>
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class AmqpIO {
-
- public static Read read() {
- return new AutoValue_AmqpIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
- }
-
- public static Write write() {
- return new AutoValue_AmqpIO_Write();
- }
-
- private AmqpIO() {
- }
-
- /**
- * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol.
- */
- @AutoValue
- public abstract static class Read extends PTransform<PBegin, PCollection<Message>> {
-
- @Nullable abstract List<String> addresses();
- abstract long maxNumRecords();
- @Nullable abstract Duration maxReadTime();
-
- abstract Builder builder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setAddresses(List<String> addresses);
- abstract Builder setMaxNumRecords(long maxNumRecords);
- abstract Builder setMaxReadTime(Duration maxReadTime);
- abstract Read build();
- }
-
- /**
- * Define the AMQP addresses where to receive messages.
- */
- public Read withAddresses(List<String> addresses) {
- checkArgument(addresses != null, "AmqpIO.read().withAddresses(addresses) called with null"
- + " addresses");
- checkArgument(!addresses.isEmpty(), "AmqpIO.read().withAddresses(addresses) called with "
- + "empty addresses list");
- return builder().setAddresses(addresses).build();
- }
-
- /**
- * Define the max number of records received by the {@link Read}.
- * When the max number of records is lower than {@code Long.MAX_VALUE}, the {@link Read} will
- * provide a bounded {@link PCollection}.
- */
- public Read withMaxNumRecords(long maxNumRecords) {
- checkArgument(maxReadTime() == null,
- "maxNumRecord and maxReadTime are exclusive");
- return builder().setMaxNumRecords(maxNumRecords).build();
- }
-
- /**
- * Define the max read time (duration) while the {@link Read} will receive messages.
- * When this max read time is not null, the {@link Read} will provide a bounded
- * {@link PCollection}.
- */
- public Read withMaxReadTime(Duration maxReadTime) {
- checkArgument(maxNumRecords() == Long.MAX_VALUE,
- "maxNumRecord and maxReadTime are exclusive");
- return builder().setMaxReadTime(maxReadTime).build();
- }
-
- @Override
- public void validate(PipelineOptions pipelineOptions) {
- checkState(addresses() != null, "AmqIO.read() requires addresses list to be set via "
- + "withAddresses(addresses)");
- checkState(!addresses().isEmpty(), "AmqIO.read() requires a non-empty addresses list to be"
- + " set via withAddresses(addresses)");
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses())));
- }
-
- @Override
- public PCollection<Message> expand(PBegin input) {
- org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded =
- org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this));
-
- PTransform<PBegin, PCollection<Message>> transform = unbounded;
-
- if (maxNumRecords() != Long.MAX_VALUE) {
- transform = unbounded.withMaxNumRecords(maxNumRecords());
- } else if (maxReadTime() != null) {
- transform = unbounded.withMaxReadTime(maxReadTime());
- }
-
- return input.getPipeline().apply(transform);
- }
-
- }
-
- private static class AmqpCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
-
- private transient Messenger messenger;
- private transient List<Tracker> trackers = new ArrayList<>();
-
- public AmqpCheckpointMark() {
- }
-
- @Override
- public void finalizeCheckpoint() {
- for (Tracker tracker : trackers) {
- // flag as not cumulative
- messenger.accept(tracker, 0);
- }
- trackers.clear();
- }
-
- // set an empty list to messages when deserialize
- private void readObject(java.io.ObjectInputStream stream)
- throws java.io.IOException, ClassNotFoundException {
- trackers = new ArrayList<>();
- }
-
- }
-
- private static class UnboundedAmqpSource
- extends UnboundedSource<Message, AmqpCheckpointMark> {
-
- private final Read spec;
-
- public UnboundedAmqpSource(Read spec) {
- this.spec = spec;
- }
-
- @Override
- public List<UnboundedAmqpSource> split(int desiredNumSplits,
- PipelineOptions pipelineOptions) {
- // amqp is a queue system, so, it's possible to have multiple concurrent sources, even if
- // they bind the listener
- List<UnboundedAmqpSource> sources = new ArrayList<>();
- for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) {
- sources.add(new UnboundedAmqpSource(spec));
- }
- return sources;
- }
-
- @Override
- public UnboundedReader<Message> createReader(PipelineOptions pipelineOptions,
- AmqpCheckpointMark checkpointMark) {
- return new UnboundedAmqpReader(this, checkpointMark);
- }
-
- @Override
- public Coder<Message> getDefaultOutputCoder() {
- return new AmqpMessageCoder();
- }
-
- @Override
- public Coder<AmqpCheckpointMark> getCheckpointMarkCoder() {
- return SerializableCoder.of(AmqpCheckpointMark.class);
- }
-
- @Override
- public void validate() {
- spec.validate(null);
- }
-
- }
-
- private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader<Message> {
-
- private final UnboundedAmqpSource source;
-
- private Messenger messenger;
- private Message current;
- private Instant currentTimestamp;
- private Instant watermark = new Instant(Long.MIN_VALUE);
- private AmqpCheckpointMark checkpointMark;
-
- public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkpointMark) {
- this.source = source;
- this.current = null;
- if (checkpointMark != null) {
- this.checkpointMark = checkpointMark;
- } else {
- this.checkpointMark = new AmqpCheckpointMark();
- }
- }
-
- @Override
- public Instant getWatermark() {
- return watermark;
- }
-
- @Override
- public Instant getCurrentTimestamp() {
- if (current == null) {
- throw new NoSuchElementException();
- }
- return currentTimestamp;
- }
-
- @Override
- public Message getCurrent() {
- if (current == null) {
- throw new NoSuchElementException();
- }
- return current;
- }
-
- @Override
- public UnboundedSource.CheckpointMark getCheckpointMark() {
- return checkpointMark;
- }
-
- @Override
- public UnboundedAmqpSource getCurrentSource() {
- return source;
- }
-
- @Override
- public boolean start() throws IOException {
- Read spec = source.spec;
- messenger = Messenger.Factory.create();
- messenger.start();
- for (String address : spec.addresses()) {
- messenger.subscribe(address);
- }
- checkpointMark.messenger = messenger;
- return advance();
- }
-
- @Override
- public boolean advance() {
- messenger.recv();
- if (messenger.incoming() <= 0) {
- current = null;
- return false;
- }
- Message message = messenger.get();
- Tracker tracker = messenger.incomingTracker();
- checkpointMark.trackers.add(tracker);
- currentTimestamp = new Instant(message.getCreationTime());
- watermark = currentTimestamp;
- current = message;
- return true;
- }
-
- @Override
- public void close() {
- if (messenger != null) {
- messenger.stop();
- }
- }
-
- }
-
- /**
- * A {@link PTransform} to send messages using AMQP 1.0 protocol.
- */
- @AutoValue
- public abstract static class Write extends PTransform<PCollection<Message>, PDone> {
-
- @Override
- public PDone expand(PCollection<Message> input) {
- input.apply(ParDo.of(new WriteFn(this)));
- return PDone.in(input.getPipeline());
- }
-
- private static class WriteFn extends DoFn<Message, Void> {
-
- private final Write spec;
-
- private transient Messenger messenger;
-
- public WriteFn(Write spec) {
- this.spec = spec;
- }
-
- @Setup
- public void setup() throws Exception {
- messenger = Messenger.Factory.create();
- messenger.start();
- }
-
- @ProcessElement
- public void processElement(ProcessContext processContext) throws Exception {
- Message message = processContext.element();
- messenger.put(message);
- messenger.send();
- }
-
- @Teardown
- public void teardown() throws Exception {
- if (messenger != null) {
- messenger.stop();
- }
- }
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
deleted file mode 100644
index 5a55260..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import com.google.common.io.ByteStreams;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.BufferOverflowException;
-
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.VarInt;
-import org.apache.qpid.proton.message.Message;
-
-/**
- * A coder for AMQP message.
- */
-public class AmqpMessageCoder extends CustomCoder<Message> {
-
- private static final int[] MESSAGE_SIZES = new int[]{
- 8 * 1024,
- 64 * 1024,
- 1 * 1024 * 1024,
- 64 * 1024 * 1024
- };
-
- static AmqpMessageCoder of() {
- return new AmqpMessageCoder();
- }
-
- @Override
- public void encode(Message value, OutputStream outStream) throws CoderException, IOException {
- for (int maxMessageSize : MESSAGE_SIZES) {
- try {
- encode(value, outStream, maxMessageSize);
- return;
- } catch (Exception e) {
- continue;
- }
- }
- throw new CoderException("Message is larger than the max size supported by the coder");
- }
-
- private void encode(Message value, OutputStream outStream, int messageSize) throws
- IOException, BufferOverflowException {
- byte[] data = new byte[messageSize];
- int bytesWritten = value.encode(data, 0, data.length);
- VarInt.encode(bytesWritten, outStream);
- outStream.write(data, 0, bytesWritten);
- }
-
- @Override
- public Message decode(InputStream inStream) throws CoderException, IOException {
- Message message = Message.Factory.create();
- int bytesToRead = VarInt.decodeInt(inStream);
- byte[] encodedMessage = new byte[bytesToRead];
- ByteStreams.readFully(inStream, encodedMessage);
- message.decode(encodedMessage, 0, encodedMessage.length);
- return message;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
deleted file mode 100644
index bc3445c..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviderRegistrar;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.qpid.proton.message.Message;
-
-/**
- * A {@link CoderProviderRegistrar} for standard types used with {@link AmqpIO}.
- */
-@AutoService(CoderProviderRegistrar.class)
-public class AmqpMessageCoderProviderRegistrar implements CoderProviderRegistrar {
-
- @Override
- public List<CoderProvider> getCoderProviders() {
- return ImmutableList.of(
- CoderProviders.forCoder(TypeDescriptor.of(Message.class),
- AmqpMessageCoder.of()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
deleted file mode 100644
index 091f234..0000000
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing using AMQP 1.0 protocol.
- */
-package org.apache.beam.sdk.io.amqp;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
deleted file mode 100644
index c8fe4e8..0000000
--- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tests on {@link AmqpIO}.
- */
-@RunWith(JUnit4.class)
-public class AmqpIOTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);
-
- private int port;
-
- @Rule public TestPipeline pipeline = TestPipeline.create();
-
- @Before
- public void findFreeNetworkPort() throws Exception {
- LOG.info("Finding free network port");
- ServerSocket socket = new ServerSocket(0);
- port = socket.getLocalPort();
- socket.close();
- }
-
- @Test
- public void testRead() throws Exception {
- PCollection<Message> output = pipeline.apply(AmqpIO.read()
- .withMaxNumRecords(100)
- .withAddresses(Collections.singletonList("amqp://~localhost:" + port)));
- PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L);
-
- Thread sender = new Thread() {
- public void run() {
- try {
- Thread.sleep(500);
- Messenger sender = Messenger.Factory.create();
- sender.start();
- for (int i = 0; i < 100; i++) {
- Message message = Message.Factory.create();
- message.setAddress("amqp://localhost:" + port);
- message.setBody(new AmqpValue("Test " + i));
- sender.put(message);
- sender.send();
- }
- sender.stop();
- } catch (Exception e) {
- LOG.error("Sender error", e);
- }
- }
- };
- try {
- sender.start();
- pipeline.run();
- } finally {
- sender.join();
- }
- }
-
- @Test
- public void testWrite() throws Exception {
- final List<String> received = new ArrayList<>();
- Thread receiver = new Thread() {
- @Override
- public void run() {
- try {
- Messenger messenger = Messenger.Factory.create();
- messenger.start();
- messenger.subscribe("amqp://~localhost:" + port);
- while (received.size() < 100) {
- messenger.recv();
- while (messenger.incoming() > 0) {
- Message message = messenger.get();
- LOG.info("Received: " + message.getBody().toString());
- received.add(message.getBody().toString());
- }
- }
- messenger.stop();
- } catch (Exception e) {
- LOG.error("Receiver error", e);
- }
- }
- };
- LOG.info("Starting AMQP receiver");
- receiver.start();
-
- List<Message> data = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- Message message = Message.Factory.create();
- message.setBody(new AmqpValue("Test " + i));
- message.setAddress("amqp://localhost:" + port);
- message.setSubject("test");
- data.add(message);
- }
- pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
- LOG.info("Starting pipeline");
- try {
- pipeline.run();
- } finally {
- LOG.info("Join receiver thread");
- receiver.join();
- }
-
- assertEquals(100, received.size());
- for (int i = 0; i < 100; i++) {
- assertTrue(received.contains("AmqpValue{Test " + i + "}"));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
deleted file mode 100644
index 7a8efeb..0000000
--- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.amqp;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.base.Joiner;
-
-import java.util.Collections;
-
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.message.Message;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test on {@link AmqpMessageCoder}.
- */
-@RunWith(JUnit4.class)
-public class AmqpMessageCoderTest {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void encodeDecode() throws Exception {
- Message message = Message.Factory.create();
- message.setBody(new AmqpValue("body"));
- message.setAddress("address");
- message.setSubject("test");
- AmqpMessageCoder coder = AmqpMessageCoder.of();
-
- Message clone = CoderUtils.clone(coder, message);
-
- assertEquals("AmqpValue{body}", clone.getBody().toString());
- assertEquals("address", clone.getAddress());
- assertEquals("test", clone.getSubject());
- }
-
- @Test
- public void encodeDecodeTooMuchLargerMessage() throws Exception {
- thrown.expect(CoderException.class);
- Message message = Message.Factory.create();
- message.setAddress("address");
- message.setSubject("subject");
- String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
- message.setBody(new AmqpValue(body));
-
- AmqpMessageCoder coder = AmqpMessageCoder.of();
-
- byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
- }
-
- @Test
- public void encodeDecodeLargeMessage() throws Exception {
- Message message = Message.Factory.create();
- message.setAddress("address");
- message.setSubject("subject");
- String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
- message.setBody(new AmqpValue(body));
-
- AmqpMessageCoder coder = AmqpMessageCoder.of();
-
- Message clone = CoderUtils.clone(coder, message);
-
- clone.getBody().toString().equals(message.getBody().toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
index c74477e..8249f57 100644
--- a/sdks/java/io/cassandra/pom.xml
+++ b/sdks/java/io/cassandra/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 32905b7..b6f4ef6 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory;
* .withEntity(Person.class));
* }</pre>
*/
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
public class CassandraIO {
private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index df0d94b..f7525fd 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 25ab929..387fd22 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -71,7 +71,11 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
Integer getElasticsearchHttpPort();
void setElasticsearchHttpPort(Integer value);
- /* Cassandra */
+ @Description("Tcp port for elasticsearch server")
+ @Default.Integer(9300)
+ Integer getElasticsearchTcpPort();
+ void setElasticsearchTcpPort(Integer value);
+
@Description("Host for Cassandra server (host name/ip address)")
@Default.String("cassandra-host")
String getCassandraHost();
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index e0a7f21..03632ce 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -137,14 +137,6 @@
<scope>test</scope>
</dependency>
- <!-- This optional dependency is used by the test framework. Avoids a warning -->
- <dependency>
- <groupId>net.java.dev.jna</groupId>
- <artifactId>jna</artifactId>
- <version>4.1.0</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 4d76887..f6ceef2 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -113,7 +113,7 @@ import org.elasticsearch.client.RestClientBuilder;
* <p>Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()}
* to specify the size of the write batch in number of documents or in bytes.
*/
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
public class ElasticsearchIO {
public static Read read() {
@@ -139,7 +139,7 @@ public class ElasticsearchIO {
private static final ObjectMapper mapper = new ObjectMapper();
- static JsonNode parseResponse(Response response) throws IOException {
+ private static JsonNode parseResponse(Response response) throws IOException {
return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
}
@@ -264,7 +264,7 @@ public class ElasticsearchIO {
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
- RestClient createClient() throws MalformedURLException {
+ private RestClient createClient() throws MalformedURLException {
HttpHost[] hosts = new HttpHost[getAddresses().size()];
int i = 0;
for (String address : getAddresses()) {
@@ -455,7 +455,16 @@ public class ElasticsearchIO {
while (shards.hasNext()) {
Map.Entry<String, JsonNode> shardJson = shards.next();
String shardId = shardJson.getKey();
- sources.add(new BoundedElasticsearchSource(spec, shardId));
+ JsonNode value = (JsonNode) shardJson.getValue();
+ boolean isPrimaryShard =
+ value
+ .path(0)
+ .path("routing")
+ .path("primary")
+ .asBoolean();
+ if (isPrimaryShard) {
+ sources.add(new BoundedElasticsearchSource(spec, shardId));
+ }
}
checkArgument(!sources.isEmpty(), "No primary shard found");
return sources;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 203963d..b0d161f 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -17,17 +17,19 @@
*/
package org.apache.beam.sdk.io.elasticsearch;
-import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import org.apache.http.HttpEntity;
-import org.apache.http.entity.ContentType;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.nio.entity.NStringEntity;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.index.IndexNotFoundException;
/** Test utilities to use with {@link ElasticsearchIO}. */
class ElasticSearchIOTestUtils {
@@ -39,68 +41,57 @@ class ElasticSearchIOTestUtils {
}
/** Deletes the given index synchronously. */
- static void deleteIndex(String index, RestClient restClient) throws IOException {
- try {
- restClient.performRequest("DELETE", String.format("/%s", index), new BasicHeader("", ""));
- } catch (IOException e) {
- // it is fine to ignore this expression as deleteIndex occurs in @before,
- // so when the first tests is run, the index does not exist yet
- if (!e.getMessage().contains("index_not_found_exception")){
- throw e;
- }
+ static void deleteIndex(String index, Client client) throws Exception {
+ IndicesAdminClient indices = client.admin().indices();
+ IndicesExistsResponse indicesExistsResponse =
+ indices.exists(new IndicesExistsRequest(index)).get();
+ if (indicesExistsResponse.isExists()) {
+ indices.prepareClose(index).get();
+ indices.delete(Requests.deleteIndexRequest(index)).get();
}
}
/** Inserts the given number of test documents into Elasticsearch. */
- static void insertTestDocuments(String index, String type, long numDocs, RestClient restClient)
- throws IOException {
+ static void insertTestDocuments(String index, String type, long numDocs, Client client)
+ throws Exception {
+ final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true);
List<String> data =
ElasticSearchIOTestUtils.createDocuments(
numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
- StringBuilder bulkRequest = new StringBuilder();
for (String document : data) {
- bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document));
+ bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document));
}
- String endPoint = String.format("/%s/%s/_bulk", index, type);
- HttpEntity requestBody =
- new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
- Response response = restClient.performRequest("POST", endPoint,
- Collections.singletonMap("refresh", "true"), requestBody,
- new BasicHeader("", ""));
- JsonNode searchResult = ElasticsearchIO.parseResponse(response);
- boolean errors = searchResult.path("errors").asBoolean();
- if (errors){
+ final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+ if (bulkResponse.hasFailures()) {
throw new IOException(
- String.format("Failed to insert test documents in index %s", index));
+ String.format(
+ "Cannot insert test documents in index %s : %s",
+ index, bulkResponse.buildFailureMessage()));
}
}
/**
- * Forces a refresh of the given index to make recently inserted documents available for search.
+ * Forces an upgrade of the given index to make recently inserted documents available for search.
*
* @return The number of docs in the index
*/
- static long refreshIndexAndGetCurrentNumDocs(String index, String type, RestClient restClient)
- throws IOException {
- long result = 0;
+ static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) {
try {
- String endPoint = String.format("/%s/_refresh", index);
- restClient.performRequest("POST", endPoint, new BasicHeader("", ""));
-
- endPoint = String.format("/%s/%s/_search", index, type);
- Response response = restClient.performRequest("GET", endPoint, new BasicHeader("", ""));
- JsonNode searchResult = ElasticsearchIO.parseResponse(response);
- result = searchResult.path("hits").path("total").asLong();
- } catch (IOException e) {
+ client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet();
+ SearchResponse response =
+ client.prepareSearch(index).setTypes(type).execute().actionGet(5000);
+ return response.getHits().getTotalHits();
// it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
// we call upgrade before any doc have been written
// (when there are fewer docs processed than batchSize).
// In that cases index/type has not been created (created upon first doc insertion)
- if (!e.getMessage().contains("index_not_found_exception")){
+ } catch (IndexNotFoundException e) {
+ } catch (java.lang.IllegalArgumentException e) {
+ if (!e.getMessage().contains("No search type")) {
throw e;
}
}
- return result;
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 7c37e87..2d6393a 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
-import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.transport.TransportClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
*/
public class ElasticsearchIOIT {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOIT.class);
- private static RestClient restClient;
+ private static TransportClient client;
private static IOTestPipelineOptions options;
private static ElasticsearchIO.ConnectionConfiguration readConnectionConfiguration;
@Rule public TestPipeline pipeline = TestPipeline.create();
@@ -66,16 +66,16 @@ public class ElasticsearchIOIT {
public static void beforeClass() throws Exception {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+ client = ElasticsearchTestDataSet.getClient(options);
readConnectionConfiguration =
ElasticsearchTestDataSet.getConnectionConfiguration(
options, ElasticsearchTestDataSet.ReadOrWrite.READ);
- restClient = readConnectionConfiguration.createClient();
}
@AfterClass
public static void afterClass() throws Exception {
- ElasticsearchTestDataSet.deleteIndex(restClient, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
- restClient.close();
+ ElasticsearchTestDataSet.deleteIndex(client, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
+ client.close();
}
@Test
@@ -128,8 +128,8 @@ public class ElasticsearchIOIT {
pipeline.run();
long currentNumDocs =
- ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
- ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, restClient);
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+ ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, client);
assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index b349a29..260af79 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.PCollection;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
import org.hamcrest.CustomMatcher;
import org.junit.AfterClass;
import org.junit.Before;
@@ -74,10 +74,9 @@ public class ElasticsearchIOTest implements Serializable {
private static final long BATCH_SIZE_BYTES = 2048L;
private static Node node;
- private static RestClient restClient;
private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
@Rule
public TestPipeline pipeline = TestPipeline.create();
@@ -92,8 +91,8 @@ public class ElasticsearchIOTest implements Serializable {
.put("cluster.name", "beam")
.put("http.enabled", "true")
.put("node.data", "true")
- .put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
- .put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
+ .put("path.data", folder.getRoot().getPath())
+ .put("path.home", folder.getRoot().getPath())
.put("node.name", "beam")
.put("network.host", ES_IP)
.put("http.port", esHttpPort)
@@ -101,29 +100,27 @@ public class ElasticsearchIOTest implements Serializable {
// had problems with some jdk, embedded ES was too slow for bulk insertion,
// and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
.put("threadpool.bulk.queue_size", 100);
- node = new Node(settingsBuilder.build());
+ node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build();
LOG.info("Elasticsearch node created");
node.start();
connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(
new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
- restClient = connectionConfiguration.createClient();
}
@AfterClass
- public static void afterClass() throws IOException{
- restClient.close();
+ public static void afterClass() {
node.close();
}
@Before
public void before() throws Exception {
- ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient);
+ ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
}
@Test
public void testSizes() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
PipelineOptions options = PipelineOptionsFactory.create();
ElasticsearchIO.Read read =
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
@@ -137,7 +134,7 @@ public class ElasticsearchIOTest implements Serializable {
@Test
public void testRead() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
PCollection<String> output =
pipeline.apply(
@@ -153,7 +150,7 @@ public class ElasticsearchIOTest implements Serializable {
@Test
public void testReadWithQuery() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
String query =
"{\n"
@@ -188,7 +185,7 @@ public class ElasticsearchIOTest implements Serializable {
pipeline.run();
long currentNumDocs =
- ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
assertEquals(NUM_DOCS, currentNumDocs);
QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist");
@@ -261,8 +258,9 @@ public class ElasticsearchIOTest implements Serializable {
if ((numDocsProcessed % 100) == 0) {
// force the index to upgrade after inserting for the inserted docs
// to be searchable immediately
- long currentNumDocs = ElasticSearchIOTestUtils
- .refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
+ long currentNumDocs =
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+ ES_INDEX, ES_TYPE, node.client());
if ((numDocsProcessed % BATCH_SIZE) == 0) {
/* bundle end */
assertEquals(
@@ -306,8 +304,8 @@ public class ElasticsearchIOTest implements Serializable {
// force the index to upgrade after inserting for the inserted docs
// to be searchable immediately
long currentNumDocs =
- ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(
- ES_INDEX, ES_TYPE, restClient);
+ ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+ ES_INDEX, ES_TYPE, node.client());
if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
/* bundle end */
assertThat(
@@ -329,7 +327,7 @@ public class ElasticsearchIOTest implements Serializable {
@Test
public void testSplit() throws Exception {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
+ ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
PipelineOptions options = PipelineOptionsFactory.create();
ElasticsearchIO.Read read =
ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
index 2a2dbe9..3a9aae6 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -17,11 +17,13 @@
*/
package org.apache.beam.sdk.io.elasticsearch;
+import static java.net.InetAddress.getByName;
import java.io.IOException;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
/**
* Manipulates test data used by the {@link ElasticsearchIO}
@@ -49,6 +51,7 @@ public class ElasticsearchTestDataSet {
* -Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \
* -Dexec.args="--elasticsearchServer=1.2.3.4 \
* --elasticsearchHttpPort=9200 \
+ * --elasticsearchTcpPort=9300" \
* -Dexec.classpathScope=test
* </pre>
*
@@ -59,20 +62,29 @@ public class ElasticsearchTestDataSet {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
IOTestPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
- createAndPopulateReadIndex(options);
+
+ createAndPopulateIndex(getClient(options), ReadOrWrite.READ);
}
- private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception {
- RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ).createClient();
+ private static void createAndPopulateIndex(TransportClient client, ReadOrWrite rOw)
+ throws Exception {
// automatically creates the index and insert docs
- try {
- ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient);
- } finally {
- restClient.close();
- }
+ ElasticSearchIOTestUtils.insertTestDocuments(
+ (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, client);
+ }
+
+ public static TransportClient getClient(IOTestPipelineOptions options) throws Exception {
+ TransportClient client =
+ TransportClient.builder()
+ .build()
+ .addTransportAddress(
+ new InetSocketTransportAddress(
+ getByName(options.getElasticsearchServer()),
+ options.getElasticsearchTcpPort()));
+ return client;
}
- static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
+ public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration(
IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException {
ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(
@@ -87,9 +99,8 @@ public class ElasticsearchTestDataSet {
return connectionConfiguration;
}
- static void deleteIndex(RestClient restClient, ReadOrWrite rOw) throws Exception {
- ElasticSearchIOTestUtils
- .deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, restClient);
+ public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception {
+ ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client);
}
/** Enum that tells whether we use the index for reading or for writing. */
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index a1495f2..8b53820 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -93,12 +93,7 @@
<dependency>
<groupId>com.google.api</groupId>
- <artifactId>gax-grpc</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.cloud</groupId>
- <artifactId>google-cloud-core-grpc</artifactId>
+ <artifactId>api-common</artifactId>
</dependency>
<dependency>
@@ -258,6 +253,11 @@
<artifactId>proto-google-common-protos</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index e46b1d3..4393a63 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -58,7 +57,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c5c2462..edb1e0d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -23,7 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.Lists;
import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
@@ -158,16 +158,21 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
}
// If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
// We must first use reflection to figure out what the type parameter is.
- TypeDescriptor<?> superDescriptor =
- TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
- if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
- throw new AssertionError(
- "Couldn't find the DynamicDestinations superclass of " + this.getClass());
+ for (Type superclass = getClass().getGenericSuperclass();
+ superclass != null;
+ superclass = ((Class) superclass).getGenericSuperclass()) {
+ if (superclass instanceof ParameterizedType) {
+ ParameterizedType parameterized = (ParameterizedType) superclass;
+ if (parameterized.getRawType() == DynamicDestinations.class) {
+ // DestinationT is the second parameter.
+ Type parameter = parameterized.getActualTypeArguments()[1];
+ @SuppressWarnings("unchecked")
+ Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
+ return registry.getCoder(parameterClass);
+ }
+ }
}
- TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
- @SuppressWarnings("unchecked")
- TypeDescriptor<DestinationT> descriptor =
- (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
- return registry.getCoder(descriptor);
+ throw new AssertionError(
+ "Couldn't find the DynamicDestinations superclass of " + this.getClass());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
index 55672ff..90d41a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ShardedKey;
/**
* Given a write to a specific table, assign that to one of the
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
new file mode 100644
index 0000000..c2b739f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A key and a shard number.
+ */
+class ShardedKey<K> implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final K key;
+ private final int shardNumber;
+
+ public static <K> ShardedKey<K> of(K key, int shardNumber) {
+ return new ShardedKey<>(key, shardNumber);
+ }
+
+ ShardedKey(K key, int shardNumber) {
+ this.key = key;
+ this.shardNumber = shardNumber;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "key: " + key + " shard: " + shardNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ShardedKey)) {
+ return false;
+ }
+ ShardedKey<K> other = (ShardedKey<K>) o;
+ return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, shardNumber);
+ }
+}