You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/08 19:01:19 UTC
[1/2] beam git commit: [BEAM-2657] Create Solr IO
Repository: beam
Updated Branches:
refs/heads/master 2fa4fdecd -> f5714f220
[BEAM-2657] Create Solr IO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d00ff9e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d00ff9e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d00ff9e2
Branch: refs/heads/master
Commit: d00ff9e215092af2666459b742f62c6b0bb4bff9
Parents: 2fa4fde
Author: Cao Manh Dat <da...@apache.org>
Authored: Sat Jul 22 14:38:07 2017 +0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 8 11:54:09 2017 -0700
----------------------------------------------------------------------
pom.xml | 8 +-
.../sdk/io/common/IOTestPipelineOptions.java | 6 +
sdks/java/io/pom.xml | 1 +
sdks/java/io/solr/pom.xml | 147 ++++
.../beam/sdk/io/solr/AuthorizedSolrClient.java | 91 +++
.../beam/sdk/io/solr/JavaBinCodecCoder.java | 98 +++
.../org/apache/beam/sdk/io/solr/SolrIO.java | 717 +++++++++++++++++++
.../apache/beam/sdk/io/solr/package-info.java | 20 +
.../beam/sdk/io/solr/JavaBinCodecCoderTest.java | 81 +++
.../org/apache/beam/sdk/io/solr/SolrIOTest.java | 269 +++++++
.../beam/sdk/io/solr/SolrIOTestUtils.java | 132 ++++
.../resources/cloud-minimal/conf/schema.xml | 29 +
.../resources/cloud-minimal/conf/solrconfig.xml | 48 ++
sdks/java/javadoc/pom.xml | 5 +
14 files changed, 1651 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80ab6e2..1bdaa97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -524,7 +524,13 @@
<version>${project.version}</version>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-solr</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 25ab929..256c94d 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -71,6 +71,12 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
Integer getElasticsearchHttpPort();
void setElasticsearchHttpPort(Integer value);
+ /* Solr */
+ @Description("Address of Zookeeper server for Solr")
+ @Default.String("zookeeper-server")
+ String getSolrZookeeperServer();
+ void setSolrZookeeperServer(String value);
+
/* Cassandra */
@Description("Host for Cassandra server (host name/ip address)")
@Default.String("cassandra-host")
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 4e02aa8..c291e5d 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -56,6 +56,7 @@
<module>kinesis</module>
<module>mongodb</module>
<module>mqtt</module>
+ <module>solr</module>
<module>xml</module>
</modules>
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/pom.xml b/sdks/java/io/solr/pom.xml
new file mode 100644
index 0000000..a757a57
--- /dev/null
+++ b/sdks/java/io/solr/pom.xml
@@ -0,0 +1,147 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <groupId>org.apache.beam</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>beam-sdks-java-io-solr</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Solr</name>
+ <description>IO to read and write from/to Solr.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>5.5.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.4.1</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-test-framework</artifactId>
+ <version>5.5.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-core</artifactId>
+ <version>5.5.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.carrotsearch.randomizedtesting</groupId>
+ <artifactId>randomizedtesting-runner</artifactId>
+ <version>2.3.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java
new file mode 100644
index 0000000..44d7b88
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.beam.sdk.io.solr.SolrIO.ConnectionConfiguration;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.params.SolrParams;
+
+/**
+ * Client for interact with Solr.
+ * @param <ClientT> type of SolrClient
+ */
+class AuthorizedSolrClient<ClientT extends SolrClient> implements Closeable {
+ private final ClientT solrClient;
+ private final String username;
+ private final String password;
+
+ AuthorizedSolrClient(ClientT solrClient, ConnectionConfiguration configuration) {
+ checkArgument(
+ solrClient != null,
+ "solrClient can not be null");
+ checkArgument(
+ configuration != null,
+ "configuration can not be null");
+ this.solrClient = solrClient;
+ this.username = configuration.getUsername();
+ this.password = configuration.getPassword();
+ }
+
+ QueryResponse query(String collection, SolrParams solrParams)
+ throws IOException, SolrServerException {
+ QueryRequest query = new QueryRequest(solrParams);
+ return process(collection, query);
+ }
+
+ <ResponseT extends SolrResponse> ResponseT process(String collection,
+ SolrRequest<ResponseT> request) throws IOException, SolrServerException {
+ request.setBasicAuthCredentials(username, password);
+ return request.process(solrClient, collection);
+ }
+
+ CoreAdminResponse process(CoreAdminRequest request)
+ throws IOException, SolrServerException {
+ return process(null, request);
+ }
+
+ SolrResponse process(CollectionAdminRequest request)
+ throws IOException, SolrServerException {
+ return process(null, request);
+ }
+
+ static ClusterState getClusterState(
+ AuthorizedSolrClient<CloudSolrClient> authorizedSolrClient) {
+ authorizedSolrClient.solrClient.connect();
+ return authorizedSolrClient.solrClient.getZkStateReader().getClusterState();
+ }
+
+ @Override public void close() throws IOException {
+ solrClient.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java
new file mode 100644
index 0000000..aef3c4b
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.commons.compress.utils.BoundedInputStream;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.JavaBinCodec;
+
+/** A {@link Coder} that encodes using {@link JavaBinCodec}. */
+class JavaBinCodecCoder<T> extends AtomicCoder<T> {
+ private final Class<T> clazz;
+
+ private JavaBinCodecCoder(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ public static <T> JavaBinCodecCoder<T> of(Class<T> clazz) {
+ return new JavaBinCodecCoder<>(clazz);
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream) throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null SolrDocument");
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JavaBinCodec codec = new JavaBinCodec();
+ codec.marshal(value, baos);
+
+ byte[] bytes = baos.toByteArray();
+ VarInt.encode(bytes.length, outStream);
+ outStream.write(bytes);
+ }
+
+ @Override
+ public T decode(InputStream inStream) throws IOException {
+ DataInputStream in = new DataInputStream(inStream);
+
+ int len = VarInt.decodeInt(in);
+ if (len < 0) {
+ throw new CoderException("Invalid encoded SolrDocument length: " + len);
+ }
+
+ JavaBinCodec codec = new JavaBinCodec();
+ return (T) codec.unmarshal(new BoundedInputStream(in, len));
+ }
+
+ @Override
+ public TypeDescriptor<T> getEncodedTypeDescriptor() {
+ return TypeDescriptor.of(clazz);
+ }
+
+ @AutoService(CoderProviderRegistrar.class)
+ public static class Provider implements CoderProviderRegistrar {
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return Arrays.asList(
+ CoderProviders.forCoder(
+ TypeDescriptor.of(SolrDocument.class), JavaBinCodecCoder.of(SolrDocument.class)),
+ CoderProviders.forCoder(
+ TypeDescriptor.of(SolrInputDocument.class),
+ JavaBinCodecCoder.of(SolrInputDocument.class)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
new file mode 100644
index 0000000..c137eea
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CursorMarkParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+
+/**
+ * Transforms for reading and writing data from/to Solr.
+ *
+ * <h3>Reading from Solr</h3>
+ *
+ * <p>{@link SolrIO#read SolrIO.read()} returns a bounded {@link PCollection
+ * PCollection<SolrDocument>} representing Solr documents.
+ *
+ * <p>To configure the {@link SolrIO#read}, you have to provide a connection configuration
+ * containing the Zookeeper address of the Solr cluster, and the collection name. The following
+ * example illustrates options for configuring the source:
+ *
+ * <pre>{@code
+ * SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration.create("127.0.0.1:9983");
+ * // Optionally: .withBasicCredentials(username, password)
+ *
+ * PCollection<SolrDocument> docs = p.apply(
+ * SolrIO.read().from("my-collection").withConnectionConfiguration(conn));
+ *
+ * }</pre>
+ *
+ * <p>You can specify a query on the {@code read()} using {@code withQuery()}.
+ *
+ * <h3>Writing to Solr</h3>
+ *
+ * <p>To write documents to Solr, use {@link SolrIO#write SolrIO.write()}, which writes Solr
+ * documents from a {@link PCollection PCollection<SolrInputDocument>} (which can be bounded
+ * or unbounded).
+ *
+ * <p>To configure {@link SolrIO#write SolrIO.write()}, similar to the read, you have to provide a
+ * connection configuration, and a collection name. For instance:
+ *
+ * <pre>{@code
+ * PCollection<SolrInputDocument> inputDocs = ...;
+ * inputDocs.apply(SolrIO.write().to("my-collection").withConnectionConfiguration(conn));
+ *
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class SolrIO {
+
+ public static Read read() {
+ // 1000 for batch size is good enough in many cases,
+ // ex: if document size is large, around 10KB, the response's size will be around 10MB
+ // if document seize is small, around 1KB, the response's size will be around 1MB
+ return new AutoValue_SolrIO_Read.Builder().setBatchSize(1000).setQuery("*:*").build();
+ }
+
+ public static Write write() {
+ // 1000 for batch size is good enough in many cases,
+ // ex: if document size is large, around 10KB, the request's size will be around 10MB
+ // if document seize is small, around 1KB, the request's size will be around 1MB
+ return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build();
+ }
+
+ private SolrIO() {}
+
+ /** A POJO describing a connection configuration to Solr. */
+ @AutoValue
+ public abstract static class ConnectionConfiguration implements Serializable {
+
+ abstract String getZkHost();
+
+ @Nullable
+ abstract String getUsername();
+
+ @Nullable
+ abstract String getPassword();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setZkHost(String zkHost);
+
+ abstract Builder setUsername(String username);
+
+ abstract Builder setPassword(String password);
+
+ abstract ConnectionConfiguration build();
+ }
+
+ /**
+ * Creates a new Solr connection configuration.
+ *
+ * @param zkHost host of zookeeper
+ * @return the connection configuration object
+ */
+ public static ConnectionConfiguration create(String zkHost) {
+ checkArgument(zkHost != null, "zkHost can not be null");
+ return new AutoValue_SolrIO_ConnectionConfiguration.Builder().setZkHost(zkHost).build();
+ }
+
+ /** If Solr basic authentication is enabled, provide the username and password. */
+ public ConnectionConfiguration withBasicCredentials(String username, String password) {
+ checkArgument(username != null, "username can not be null");
+ checkArgument(!username.isEmpty(), "username can not be empty");
+ checkArgument(password != null, "password can not be null");
+ checkArgument(!password.isEmpty(), "password can not be empty");
+ return builder().setUsername(username).setPassword(password).build();
+ }
+
+ private void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("zkHost", getZkHost()));
+ builder.addIfNotNull(DisplayData.item("username", getUsername()));
+ }
+
+ private HttpClient createHttpClient() {
+ // This is bug in Solr, if we don't create a customize HttpClient,
+ // UpdateRequest with commit flag will throw an authentication error.
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(HttpClientUtil.PROP_BASIC_AUTH_USER, getUsername());
+ params.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, getPassword());
+ return HttpClientUtil.createClient(params);
+ }
+
+ AuthorizedSolrClient<CloudSolrClient> createClient() throws MalformedURLException {
+ CloudSolrClient solrClient = new CloudSolrClient(getZkHost(), createHttpClient());
+ return new AuthorizedSolrClient<>(solrClient, this);
+ }
+
+ AuthorizedSolrClient<HttpSolrClient> createClient(String shardUrl) {
+ HttpSolrClient solrClient = new HttpSolrClient(shardUrl, createHttpClient());
+ return new AuthorizedSolrClient<>(solrClient, this);
+ }
+ }
+
+ /** A {@link PTransform} reading data from Solr. */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<SolrDocument>> {
+ private static final long MAX_BATCH_SIZE = 10000L;
+
+ @Nullable
+ abstract ConnectionConfiguration getConnectionConfiguration();
+
+ @Nullable
+ abstract String getCollection();
+
+ abstract String getQuery();
+
+ abstract int getBatchSize();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+ abstract Builder setQuery(String query);
+
+ abstract Builder setBatchSize(int batchSize);
+
+ abstract Builder setCollection(String collection);
+
+ abstract Read build();
+ }
+
+ /** Provide the Solr connection configuration object. */
+ public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+ checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
+ return builder().setConnectionConfiguration(connectionConfiguration).build();
+ }
+
+ /**
+ * Provide name of collection while reading from Solr.
+ *
+ * @param collection the collection toward which the requests will be issued
+ */
+ public Read from(String collection) {
+ checkArgument(collection != null, "collection can not be null");
+ return builder().setCollection(collection).build();
+ }
+
+ /**
+ * Provide a query used while reading from Solr.
+ *
+ * @param query the query. See <a
+ * href="https://cwiki.apache.org/confluence/display/solr/The+Standard+Query+Parser">Solr
+ * Query </a>
+ */
+ public Read withQuery(String query) {
+ checkArgument(query != null, "query can not be null");
+ checkArgument(!query.isEmpty(), "query can not be empty");
+ return builder().setQuery(query).build();
+ }
+
+ /**
+ * Provide a size for the cursor read. See <a
+ * href="https://cwiki.apache.org/confluence/display/solr/Pagination+of+Results">cursor API</a>
+ * Default is 100. Maximum is 10 000. If documents are small, increasing batch size might
+ * improve read performance. If documents are big, you might need to decrease batchSize
+ *
+ * @param batchSize number of documents read in each scroll read
+ */
+ @VisibleForTesting
+ Read withBatchSize(int batchSize) {
+ // TODO remove this configuration, we can figure out the best number
+ // by tuning batchSize when pipelines run.
+ checkArgument(
+ batchSize > 0 && batchSize < MAX_BATCH_SIZE,
+ "Valid values for batchSize are 1 (inclusize) to %s (exclusive), but was: %s ",
+ MAX_BATCH_SIZE,
+ batchSize);
+ return builder().setBatchSize(batchSize).build();
+ }
+
+ @Override
+ public PCollection<SolrDocument> expand(PBegin input) {
+ return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null)));
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ checkState(
+ getConnectionConfiguration() != null,
+ "Need to set connection configuration using withConnectionConfiguration()");
+ checkState(getCollection() != null, "Need to set collection name using to()");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.addIfNotNull(DisplayData.item("query", getQuery()));
+ getConnectionConfiguration().populateDisplayData(builder);
+ }
+ }
+
+ /** A POJO describing a replica of Solr. */
+ @AutoValue
+ abstract static class ReplicaInfo implements Serializable {
+ public abstract String coreName();
+
+ public abstract String coreUrl();
+
+ public abstract String baseUrl();
+
+ static ReplicaInfo create(Replica replica) {
+ return new AutoValue_SolrIO_ReplicaInfo(
+ replica.getStr(ZkStateReader.CORE_NAME_PROP),
+ replica.getCoreUrl(),
+ replica.getStr(ZkStateReader.BASE_URL_PROP));
+ }
+ }
+
+ /** A {@link BoundedSource} reading from Solr. */
+ @VisibleForTesting
+ static class BoundedSolrSource extends BoundedSource<SolrDocument> {
+
+ private final SolrIO.Read spec;
+ // replica is the info of the shard where the source will read the documents
+ @Nullable private final ReplicaInfo replica;
+
+ BoundedSolrSource(Read spec, @Nullable Replica replica) {
+ this.spec = spec;
+ this.replica = replica == null ? null : ReplicaInfo.create(replica);
+ }
+
+ @Override
+ public List<? extends BoundedSource<SolrDocument>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ ConnectionConfiguration connectionConfig = spec.getConnectionConfiguration();
+ List<BoundedSolrSource> sources = new ArrayList<>();
+ try (AuthorizedSolrClient<CloudSolrClient> client = connectionConfig.createClient()) {
+ String collection = spec.getCollection();
+ final ClusterState clusterState = AuthorizedSolrClient.getClusterState(client);
+ DocCollection docCollection = clusterState.getCollection(collection);
+ for (Slice slice : docCollection.getSlices()) {
+ ArrayList<Replica> replicas = new ArrayList<>(slice.getReplicas());
+ Collections.shuffle(replicas);
+ // Load balancing by randomly picking an active replica
+ Replica randomActiveReplica = null;
+ for (Replica replica : replicas) {
+ // We need to check both state of the replica and live nodes
+ // to make sure that the replica is alive
+ if (replica.getState() == Replica.State.ACTIVE
+ && clusterState.getLiveNodes().contains(replica.getNodeName())) {
+ randomActiveReplica = replica;
+ break;
+ }
+ }
+ // TODO in case of this replica goes inactive while the pipeline runs.
+ // We should pick another active replica of this shard.
+ checkState(
+ randomActiveReplica != null,
+ "Can not found an active replica for slice %s",
+ slice.getName());
+ sources.add(new BoundedSolrSource(spec, randomActiveReplica));
+ }
+ }
+ return sources;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+ if (replica != null) {
+ return getEstimatedSizeOfShard(replica);
+ } else {
+ return getEstimatedSizeOfCollection();
+ }
+ }
+
+ private long getEstimatedSizeOfShard(ReplicaInfo replica) throws IOException {
+ try (AuthorizedSolrClient solrClient =
+ spec.getConnectionConfiguration().createClient(replica.baseUrl())) {
+ CoreAdminRequest req = new CoreAdminRequest();
+ req.setAction(CoreAdminParams.CoreAdminAction.STATUS);
+ req.setIndexInfoNeeded(true);
+ CoreAdminResponse response;
+ try {
+ response = solrClient.process(req);
+ } catch (SolrServerException e) {
+ throw new IOException("Can not get core status from " + replica, e);
+ }
+ NamedList<Object> coreStatus = response.getCoreStatus(replica.coreName());
+ NamedList<Object> indexStats = (NamedList<Object>) coreStatus.get("index");
+ return (long) indexStats.get("sizeInBytes");
+ }
+ }
+
+ private long getEstimatedSizeOfCollection() throws IOException {
+ long sizeInBytes = 0;
+ ConnectionConfiguration config = spec.getConnectionConfiguration();
+ try (AuthorizedSolrClient<CloudSolrClient> solrClient = config.createClient()) {
+ DocCollection docCollection =
+ AuthorizedSolrClient.getClusterState(solrClient).getCollection(spec.getCollection());
+ if (docCollection.getSlices().isEmpty()) {
+ return 0;
+ }
+
+ ArrayList<Slice> slices = new ArrayList<>(docCollection.getSlices());
+ Collections.shuffle(slices);
+ ExecutorService executor =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setThreadFactory(MoreExecutors.platformThreadFactory())
+ .setDaemon(true)
+ .setNameFormat("solrio-size-of-collection-estimation")
+ .build());
+ try {
+ ArrayList<Future<Long>> futures = new ArrayList<>();
+ for (int i = 0; i < 100 && i < slices.size(); i++) {
+ Slice slice = slices.get(i);
+ final Replica replica = slice.getLeader();
+ Future<Long> future =
+ executor.submit(
+ new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ return getEstimatedSizeOfShard(ReplicaInfo.create(replica));
+ }
+ });
+ futures.add(future);
+ }
+ for (Future<Long> future : futures) {
+ try {
+ sizeInBytes += future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException("Can not estimate size of shard", e.getCause());
+ }
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+
+ if (slices.size() <= 100) {
+ return sizeInBytes;
+ }
+ return (sizeInBytes / 100) * slices.size();
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ spec.populateDisplayData(builder);
+ if (replica != null) {
+ builder.addIfNotNull(DisplayData.item("shardUrl", replica.coreUrl()));
+ }
+ }
+
+ @Override
+ public BoundedReader<SolrDocument> createReader(PipelineOptions options) throws IOException {
+ return new BoundedSolrReader(this);
+ }
+
+ @Override
+ public void validate() {
+ spec.validate(null);
+ }
+
+ @Override
+ public Coder<SolrDocument> getOutputCoder() {
+ return JavaBinCodecCoder.of(SolrDocument.class);
+ }
+ }
+
+ private static class BoundedSolrReader extends BoundedSource.BoundedReader<SolrDocument> {
+
+ private final BoundedSolrSource source;
+
+ private AuthorizedSolrClient solrClient;
+ private SolrDocument current;
+ private String cursorMark;
+ private Iterator<SolrDocument> batchIterator;
+ private boolean done;
+ private String uniqueKey;
+
+ private BoundedSolrReader(BoundedSolrSource source) {
+ this.source = source;
+ this.cursorMark = CursorMarkParams.CURSOR_MARK_START;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ if (source.replica != null) {
+ solrClient =
+ source.spec.getConnectionConfiguration().createClient(source.replica.baseUrl());
+ } else {
+ solrClient = source.spec.getConnectionConfiguration().createClient();
+ }
+ SchemaRequest.UniqueKey uniqueKeyRequest = new SchemaRequest.UniqueKey();
+ try {
+ String collection = source.spec.getCollection();
+ SchemaResponse.UniqueKeyResponse uniqueKeyResponse =
+ (SchemaResponse.UniqueKeyResponse) solrClient.process(collection, uniqueKeyRequest);
+ uniqueKey = uniqueKeyResponse.getUniqueKey();
+ } catch (SolrServerException e) {
+ throw new IOException("Can not get unique key from solr", e);
+ }
+ return advance();
+ }
+
+ private SolrQuery getQueryParams(BoundedSolrSource source) {
+ String query = source.spec.getQuery();
+ if (query == null) {
+ query = "*:*";
+ }
+ SolrQuery solrQuery = new SolrQuery(query);
+ solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+ solrQuery.setRows(source.spec.getBatchSize());
+ solrQuery.addSort(uniqueKey, SolrQuery.ORDER.asc);
+ if (source.replica != null) {
+ solrQuery.setDistrib(false);
+ }
+ return solrQuery;
+ }
+
+ private void updateCursorMark(QueryResponse response) {
+ if (cursorMark.equals(response.getNextCursorMark())) {
+ done = true;
+ }
+ cursorMark = response.getNextCursorMark();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (batchIterator != null && batchIterator.hasNext()) {
+ current = batchIterator.next();
+ return true;
+ } else {
+ SolrQuery solrQuery = getQueryParams(source);
+ try {
+ QueryResponse response;
+ if (source.replica != null) {
+ response = solrClient.query(source.replica.coreName(), solrQuery);
+ } else {
+ response = solrClient.query(source.spec.getCollection(), solrQuery);
+ }
+ updateCursorMark(response);
+ return readNextBatchAndReturnFirstDocument(response);
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private boolean readNextBatchAndReturnFirstDocument(QueryResponse response) {
+ if (done) {
+ current = null;
+ batchIterator = null;
+ return false;
+ }
+
+ batchIterator = response.getResults().iterator();
+ current = batchIterator.next();
+ return true;
+ }
+
+ @Override
+ public SolrDocument getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ solrClient.close();
+ }
+
+ @Override
+ public BoundedSource<SolrDocument> getCurrentSource() {
+ return source;
+ }
+ }
+
+ /** A {@link PTransform} writing data to Solr. */
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<SolrInputDocument>, PDone> {
+
+ @Nullable
+ abstract ConnectionConfiguration getConnectionConfiguration();
+
+ @Nullable
+ abstract String getCollection();
+
+ abstract int getMaxBatchSize();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+ abstract Builder setCollection(String collection);
+
+ abstract Builder setMaxBatchSize(int maxBatchSize);
+
+ abstract Write build();
+ }
+
+ /** Provide the Solr connection configuration object. */
+ public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+ checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
+ return builder().setConnectionConfiguration(connectionConfiguration).build();
+ }
+
+ /**
+ * Provide name of collection while reading from Solr.
+ *
+ * @param collection the collection toward which the requests will be issued
+ */
+ public Write to(String collection) {
+ checkArgument(collection != null, "collection can not be null");
+ return builder().setCollection(collection).build();
+ }
+
+ /**
+ * Provide a maximum size in number of documents for the batch. Depending on the execution
+ * engine, size of bundles may vary, this sets the maximum size. Change this if you need to have
+ * smaller batch.
+ *
+ * @param batchSize maximum batch size in number of documents
+ */
+ @VisibleForTesting
+ Write withMaxBatchSize(int batchSize) {
+ // TODO remove this configuration, we can figure out the best number
+ // by tuning batchSize when pipelines run.
+ checkArgument(batchSize > 0, "batchSize must be larger than 0, but was: %s", batchSize);
+ return builder().setMaxBatchSize(batchSize).build();
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ checkState(
+ getConnectionConfiguration() != null,
+ "Need to set connection configuration via withConnectionConfiguration()");
+ checkState(getCollection() != null, "Need to set collection name via to()");
+ }
+
+ @Override
+ public PDone expand(PCollection<SolrInputDocument> input) {
+ input.apply(ParDo.of(new WriteFn(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ @VisibleForTesting
+ static class WriteFn extends DoFn<SolrInputDocument, Void> {
+
+ private final Write spec;
+
+ private transient AuthorizedSolrClient solrClient;
+ private Collection<SolrInputDocument> batch;
+
+ WriteFn(Write spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void createClient() throws Exception {
+ solrClient = spec.getConnectionConfiguration().createClient();
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext context) throws Exception {
+ batch = new ArrayList<>();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ SolrInputDocument document = context.element();
+ batch.add(document);
+ if (batch.size() >= spec.getMaxBatchSize()) {
+ flushBatch();
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context) throws Exception {
+ flushBatch();
+ }
+
+ private void flushBatch() throws IOException {
+ if (batch.isEmpty()) {
+ return;
+ }
+ try {
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.add(batch);
+ solrClient.process(spec.getCollection(), updateRequest);
+ } catch (SolrServerException e) {
+ throw new IOException("Error writing to Solr", e);
+ } finally {
+ batch.clear();
+ }
+ }
+
+ @Teardown
+ public void closeClient() throws Exception {
+ if (solrClient != null) {
+ solrClient.close();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java
new file mode 100644
index 0000000..83867ed
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from/to Solr. */
+package org.apache.beam.sdk.io.solr;
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java
new file mode 100644
index 0000000..1fb435d
--- /dev/null
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.solr.common.SolrDocument;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test case for {@link JavaBinCodecCoder}. */
+@RunWith(JUnit4.class)
+public class JavaBinCodecCoderTest {
+ private static final Coder<SolrDocument> TEST_CODER = JavaBinCodecCoder.of(SolrDocument.class);
+ private static final List<SolrDocument> TEST_VALUES = new ArrayList<>();
+
+ static {
+ SolrDocument doc = new SolrDocument();
+ doc.put("id", "1");
+ doc.put("content", "wheel on the bus");
+ doc.put("_version_", 1573597324260671488L);
+ TEST_VALUES.add(doc);
+
+ doc = new SolrDocument();
+ doc.put("id", "2");
+ doc.put("content", "goes round and round");
+ doc.put("_version_", 1573597324260671489L);
+ TEST_VALUES.add(doc);
+ }
+
+ @Test
+ public void testDecodeEncodeEqual() throws Exception {
+ for (SolrDocument value : TEST_VALUES) {
+ CoderProperties.coderDecodeEncodeContentsInSameOrder(TEST_CODER, value);
+ CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value);
+ }
+ }
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void encodeNullThrowsCoderException() throws Exception {
+ thrown.expect(CoderException.class);
+ thrown.expectMessage("cannot encode a null SolrDocument");
+
+ CoderUtils.encodeToBase64(TEST_CODER, null);
+ }
+
+ @Test
+ public void testEncodedTypeDescriptor() throws Exception {
+ assertThat(
+ TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(SolrDocument.class)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
new file mode 100644
index 0000000..4358ce4
--- /dev/null
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import com.google.common.io.BaseEncoding;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.security.Sha256AuthenticationProvider;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A test of {@link SolrIO} on an independent Solr instance. */
+@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE)
+@SolrTestCaseJ4.SuppressSSL
+public class SolrIOTest extends SolrCloudTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(SolrIOTest.class);
+
+ private static final String SOLR_COLLECTION = "beam";
+ private static final int NUM_SHARDS = 3;
+ private static final long NUM_DOCS = 400L;
+ private static final int NUM_SCIENTISTS = 10;
+ private static final int BATCH_SIZE = 200;
+
+ private static AuthorizedSolrClient<CloudSolrClient> solrClient;
+ private static SolrIO.ConnectionConfiguration connectionConfiguration;
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // setup credential for solr user,
+ // See https://cwiki.apache.org/confluence/display/solr/Basic+Authentication+Plugin
+ String password = "SolrRocks";
+ // salt's size can be arbitrary
+ byte[] salt = new byte[random().nextInt(30) + 1];
+ random().nextBytes(salt);
+ String base64Salt = BaseEncoding.base64().encode(salt);
+ String sha56 = Sha256AuthenticationProvider.sha256(password, base64Salt);
+ String credential = sha56 + " " + base64Salt;
+ String securityJson =
+ "{"
+ + "'authentication':{"
+ + " 'blockUnknown': true,"
+ + " 'class':'solr.BasicAuthPlugin',"
+ + " 'credentials':{'solr':'"
+ + credential
+ + "'}}"
+ + "}";
+
+ configureCluster(3).addConfig("conf", getFile("cloud-minimal/conf").toPath()).configure();
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ zkStateReader
+ .getZkClient()
+ .setData("/security.json", securityJson.getBytes(Charset.defaultCharset()), true);
+ String zkAddress = cluster.getZkServer().getZkAddress();
+ connectionConfiguration =
+ SolrIO.ConnectionConfiguration.create(zkAddress).withBasicCredentials("solr", password);
+ solrClient = connectionConfiguration.createClient();
+ SolrIOTestUtils.createCollection(SOLR_COLLECTION, NUM_SHARDS, 1, solrClient);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ solrClient.close();
+ }
+
+ @Before
+ public void before() throws Exception {
+ SolrIOTestUtils.clearCollection(SOLR_COLLECTION, solrClient);
+ }
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ public void testBadCredentials() throws IOException {
+ thrown.expect(SolrException.class);
+
+ String zkAddress = cluster.getZkServer().getZkAddress();
+ SolrIO.ConnectionConfiguration connectionConfiguration =
+ SolrIO.ConnectionConfiguration.create(zkAddress)
+ .withBasicCredentials("solr", "wrongpassword");
+ try (AuthorizedSolrClient solrClient = connectionConfiguration.createClient()) {
+ SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+ }
+ }
+
+ @Test
+ public void testSizes() throws Exception {
+ SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ SolrIO.Read read =
+ SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION);
+ SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null);
+ // can't use equal assert as Solr collections never have same size
+ // (due to internal Lucene implementation)
+ long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+ LOG.info("Estimated size: {}", estimatedSize);
+ assertThat(
+ "Wrong estimated size bellow minimum",
+ estimatedSize,
+ greaterThan(SolrIOTestUtils.MIN_DOC_SIZE * NUM_DOCS));
+ assertThat(
+ "Wrong estimated size beyond maximum",
+ estimatedSize,
+ lessThan(SolrIOTestUtils.MAX_DOC_SIZE * NUM_DOCS));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+ PCollection<SolrDocument> output =
+ pipeline.apply(
+ SolrIO.read()
+ .withConnectionConfiguration(connectionConfiguration)
+ .from(SOLR_COLLECTION)
+ .withBatchSize(101));
+ PAssert.thatSingleton(output.apply("Count", Count.<SolrDocument>globally()))
+ .isEqualTo(NUM_DOCS);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadWithQuery() throws Exception {
+ SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+ PCollection<SolrDocument> output =
+ pipeline.apply(
+ SolrIO.read()
+ .withConnectionConfiguration(connectionConfiguration)
+ .from(SOLR_COLLECTION)
+ .withQuery("scientist:Franklin"));
+ PAssert.thatSingleton(output.apply("Count", Count.<SolrDocument>globally()))
+ .isEqualTo(NUM_DOCS / NUM_SCIENTISTS);
+ pipeline.run();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ List<SolrInputDocument> data = SolrIOTestUtils.createDocuments(NUM_DOCS);
+ SolrIO.Write write =
+ SolrIO.write().withConnectionConfiguration(connectionConfiguration).to(SOLR_COLLECTION);
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ long currentNumDocs = SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient);
+ assertEquals(NUM_DOCS, currentNumDocs);
+
+ QueryResponse response = solrClient.query(SOLR_COLLECTION, new SolrQuery("scientist:Lovelace"));
+ assertEquals(NUM_DOCS / NUM_SCIENTISTS, response.getResults().getNumFound());
+ }
+
+ @Test
+ public void testWriteWithMaxBatchSize() throws Exception {
+ SolrIO.Write write =
+ SolrIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .to(SOLR_COLLECTION)
+ .withMaxBatchSize(BATCH_SIZE);
+ // write bundles size is the runner decision, we cannot force a bundle size,
+ // so we test the Writer as a DoFn outside of a runner.
+ try (DoFnTester<SolrInputDocument, Void> fnTester =
+ DoFnTester.of(new SolrIO.Write.WriteFn(write))) {
+ List<SolrInputDocument> input = SolrIOTestUtils.createDocuments(NUM_DOCS);
+ long numDocsProcessed = 0;
+ long numDocsInserted = 0;
+ for (SolrInputDocument document : input) {
+ fnTester.processElement(document);
+ numDocsProcessed++;
+ // test every 100 docs to avoid overloading Solr
+ if ((numDocsProcessed % 100) == 0) {
+ // force the index to upgrade after inserting for the inserted docs
+ // to be searchable immediately
+ long currentNumDocs =
+ SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient);
+ if ((numDocsProcessed % BATCH_SIZE) == 0) {
+ /* bundle end */
+ assertEquals(
+ "we are at the end of a bundle, we should have inserted all processed documents",
+ numDocsProcessed,
+ currentNumDocs);
+ numDocsInserted = currentNumDocs;
+ } else {
+ /* not bundle end */
+ assertEquals(
+ "we are not at the end of a bundle, we should have inserted no more documents",
+ numDocsInserted,
+ currentNumDocs);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSplit() throws Exception {
+ SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ SolrIO.Read read =
+ SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION);
+ SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null);
+ //desiredBundleSize is ignored for now
+ int desiredBundleSizeBytes = 0;
+ List<? extends BoundedSource<SolrDocument>> splits =
+ initialSource.split(desiredBundleSizeBytes, options);
+ SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+
+ int expectedNumSplits = NUM_SHARDS;
+ assertEquals(expectedNumSplits, splits.size());
+ int nonEmptySplits = 0;
+ for (BoundedSource<SolrDocument> subSource : splits) {
+ if (readFromSource(subSource, options).size() > 0) {
+ nonEmptySplits += 1;
+ }
+ }
+ // docs are hashed by id to shards, in this test, NUM_DOCS >> NUM_SHARDS
+ // therefore, can not exist an empty shard.
+ assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
new file mode 100644
index 0000000..808cd0f
--- /dev/null
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+
+/** Test utilities to use with {@link SolrIO}. */
+public class SolrIOTestUtils {
+ public static final long MIN_DOC_SIZE = 40L;
+ public static final long MAX_DOC_SIZE = 90L;
+
+ static void createCollection(
+ String collection, int numShards, int replicationFactor, AuthorizedSolrClient client)
+ throws Exception {
+ CollectionAdminRequest.Create create =
+ new CollectionAdminRequest.Create()
+ .setCollectionName(collection)
+ .setNumShards(numShards)
+ .setReplicationFactor(replicationFactor)
+ .setMaxShardsPerNode(2);
+ client.process(create);
+ }
+
+ /** Inserts the given number of test documents into Solr. */
+ static void insertTestDocuments(String collection, long numDocs, AuthorizedSolrClient client)
+ throws IOException {
+ List<SolrInputDocument> data = createDocuments(numDocs);
+ try {
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+ updateRequest.add(data);
+ client.process(collection, updateRequest);
+ } catch (SolrServerException e) {
+ throw new IOException("Failed to insert test documents to collection", e);
+ }
+ }
+
+ /** Delete given collection. */
+ static void deleteCollection(String collection, AuthorizedSolrClient client) throws IOException {
+ try {
+ CollectionAdminRequest.Delete delete =
+ new CollectionAdminRequest.Delete().setCollectionName(collection);
+ client.process(delete);
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** Clear given collection. */
+ static void clearCollection(String collection, AuthorizedSolrClient client) throws IOException {
+ try {
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+ updateRequest.deleteByQuery("*:*");
+ client.process(collection, updateRequest);
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Forces a commit of the given collection to make recently inserted documents available for
+ * search.
+ *
+ * @return The number of docs in the index
+ */
+ static long commitAndGetCurrentNumDocs(String collection, AuthorizedSolrClient client)
+ throws IOException {
+ SolrQuery solrQuery = new SolrQuery("*:*");
+ solrQuery.setRows(0);
+ try {
+ UpdateRequest update = new UpdateRequest();
+ update.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+ client.process(collection, update);
+
+ return client.query(collection, new SolrQuery("*:*")).getResults().getNumFound();
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Generates a list of test documents for insertion.
+ *
+ * @return the list of json String representing the documents
+ */
+ static List<SolrInputDocument> createDocuments(long numDocs) {
+ String[] scientists = {
+ "Lovelace",
+ "Franklin",
+ "Meitner",
+ "Hopper",
+ "Curie",
+ "Faraday",
+ "Newton",
+ "Bohr",
+ "Galilei",
+ "Maxwell"
+ };
+ ArrayList<SolrInputDocument> data = new ArrayList<>();
+ for (int i = 0; i < numDocs; i++) {
+ int index = i % scientists.length;
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.setField("id", String.valueOf(i));
+ doc.setField("scientist", scientists[index]);
+ data.add(doc);
+ }
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..08a1716
--- /dev/null
+++ b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <fieldType name="string" class="solr.StrField"/>
+ <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+ <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+ <dynamicField name="*" type="string" indexed="true" stored="true"/>
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <dynamicField name="*_s" type="string" indexed="true" stored="true" />
+ <uniqueKey>id</uniqueKey>
+</schema>
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..8da7d28
--- /dev/null
+++ b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+</config>
+
http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index e1adb79..1fb6e41 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -172,6 +172,11 @@
<artifactId>beam-sdks-java-io-mqtt</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-solr</artifactId>
+ </dependency>
+
<!-- provided and optional dependencies.-->
<dependency>
<groupId>com.google.auto.service</groupId>
[2/2] beam git commit: This closes #3618: [BEAM-2657] Create Solr IO
Posted by jk...@apache.org.
This closes #3618: [BEAM-2657] Create Solr IO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5714f22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5714f22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5714f22
Branch: refs/heads/master
Commit: f5714f2203974a0f97f778d62a82678025bd425e
Parents: 2fa4fde d00ff9e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 8 11:54:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 8 11:54:25 2017 -0700
----------------------------------------------------------------------
pom.xml | 8 +-
.../sdk/io/common/IOTestPipelineOptions.java | 6 +
sdks/java/io/pom.xml | 1 +
sdks/java/io/solr/pom.xml | 147 ++++
.../beam/sdk/io/solr/AuthorizedSolrClient.java | 91 +++
.../beam/sdk/io/solr/JavaBinCodecCoder.java | 98 +++
.../org/apache/beam/sdk/io/solr/SolrIO.java | 717 +++++++++++++++++++
.../apache/beam/sdk/io/solr/package-info.java | 20 +
.../beam/sdk/io/solr/JavaBinCodecCoderTest.java | 81 +++
.../org/apache/beam/sdk/io/solr/SolrIOTest.java | 269 +++++++
.../beam/sdk/io/solr/SolrIOTestUtils.java | 132 ++++
.../resources/cloud-minimal/conf/schema.xml | 29 +
.../resources/cloud-minimal/conf/solrconfig.xml | 48 ++
sdks/java/javadoc/pom.xml | 5 +
14 files changed, 1651 insertions(+), 1 deletion(-)
----------------------------------------------------------------------