You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/08 19:01:19 UTC

[1/2] beam git commit: [BEAM-2657] Create Solr IO

Repository: beam
Updated Branches:
  refs/heads/master 2fa4fdecd -> f5714f220


[BEAM-2657] Create Solr IO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d00ff9e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d00ff9e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d00ff9e2

Branch: refs/heads/master
Commit: d00ff9e215092af2666459b742f62c6b0bb4bff9
Parents: 2fa4fde
Author: Cao Manh Dat <da...@apache.org>
Authored: Sat Jul 22 14:38:07 2017 +0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 8 11:54:09 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   8 +-
 .../sdk/io/common/IOTestPipelineOptions.java    |   6 +
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/io/solr/pom.xml                       | 147 ++++
 .../beam/sdk/io/solr/AuthorizedSolrClient.java  |  91 +++
 .../beam/sdk/io/solr/JavaBinCodecCoder.java     |  98 +++
 .../org/apache/beam/sdk/io/solr/SolrIO.java     | 717 +++++++++++++++++++
 .../apache/beam/sdk/io/solr/package-info.java   |  20 +
 .../beam/sdk/io/solr/JavaBinCodecCoderTest.java |  81 +++
 .../org/apache/beam/sdk/io/solr/SolrIOTest.java | 269 +++++++
 .../beam/sdk/io/solr/SolrIOTestUtils.java       | 132 ++++
 .../resources/cloud-minimal/conf/schema.xml     |  29 +
 .../resources/cloud-minimal/conf/solrconfig.xml |  48 ++
 sdks/java/javadoc/pom.xml                       |   5 +
 14 files changed, 1651 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80ab6e2..1bdaa97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -524,7 +524,13 @@
         <version>${project.version}</version>
       </dependency>
 
-	  <dependency>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-solr</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
 	    <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 25ab929..256c94d 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -71,6 +71,12 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   Integer getElasticsearchHttpPort();
   void setElasticsearchHttpPort(Integer value);
 
+  /* Solr */
+  @Description("Address of Zookeeper server for Solr")
+  @Default.String("zookeeper-server")
+  String getSolrZookeeperServer();
+  void setSolrZookeeperServer(String value);
+
   /* Cassandra */
   @Description("Host for Cassandra server (host name/ip address)")
   @Default.String("cassandra-host")

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 4e02aa8..c291e5d 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -56,6 +56,7 @@
     <module>kinesis</module>
     <module>mongodb</module>
     <module>mqtt</module>
+    <module>solr</module>
     <module>xml</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/pom.xml b/sdks/java/io/solr/pom.xml
new file mode 100644
index 0000000..a757a57
--- /dev/null
+++ b/sdks/java/io/solr/pom.xml
@@ -0,0 +1,147 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>beam-sdks-java-io-parent</artifactId>
+        <groupId>org.apache.beam</groupId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>beam-sdks-java-io-solr</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: Solr</name>
+    <description>IO to read and write from/to Solr.</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-solrj</artifactId>
+            <version>5.5.4</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+        </dependency>
+
+        <!-- compile dependencies -->
+        <dependency>
+            <groupId>com.google.auto.value</groupId>
+            <artifactId>auto-value</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.4.1</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-common</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-direct-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-test-framework</artifactId>
+            <version>5.5.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-core</artifactId>
+            <version>5.5.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.3.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java
new file mode 100644
index 0000000..44d7b88
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.beam.sdk.io.solr.SolrIO.ConnectionConfiguration;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.params.SolrParams;
+
+/**
+ * Client for interact with Solr.
+ * @param <ClientT> type of SolrClient
+ */
+class AuthorizedSolrClient<ClientT extends SolrClient> implements Closeable {
+  private final ClientT solrClient;
+  private final String username;
+  private final String password;
+
+  AuthorizedSolrClient(ClientT solrClient, ConnectionConfiguration configuration) {
+    checkArgument(
+        solrClient != null,
+        "solrClient can not be null");
+    checkArgument(
+        configuration != null,
+        "configuration can not be null");
+    this.solrClient = solrClient;
+    this.username = configuration.getUsername();
+    this.password = configuration.getPassword();
+  }
+
+  QueryResponse query(String collection, SolrParams solrParams)
+      throws IOException, SolrServerException {
+    QueryRequest query = new QueryRequest(solrParams);
+    return process(collection, query);
+  }
+
+  <ResponseT extends SolrResponse> ResponseT process(String collection,
+      SolrRequest<ResponseT> request) throws IOException, SolrServerException {
+    request.setBasicAuthCredentials(username, password);
+    return request.process(solrClient, collection);
+  }
+
+  CoreAdminResponse process(CoreAdminRequest request)
+      throws IOException, SolrServerException {
+    return process(null, request);
+  }
+
+  SolrResponse process(CollectionAdminRequest request)
+      throws IOException, SolrServerException {
+    return process(null, request);
+  }
+
+  static ClusterState getClusterState(
+      AuthorizedSolrClient<CloudSolrClient> authorizedSolrClient) {
+    authorizedSolrClient.solrClient.connect();
+    return authorizedSolrClient.solrClient.getZkStateReader().getClusterState();
+  }
+
+  @Override public void close() throws IOException {
+    solrClient.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java
new file mode 100644
index 0000000..aef3c4b
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.commons.compress.utils.BoundedInputStream;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.JavaBinCodec;
+
+/** A {@link Coder} that encodes using {@link JavaBinCodec}. */
+class JavaBinCodecCoder<T> extends AtomicCoder<T> {
+  private final Class<T> clazz;
+
+  private JavaBinCodecCoder(Class<T> clazz) {
+    this.clazz = clazz;
+  }
+
+  public static <T> JavaBinCodecCoder<T> of(Class<T> clazz) {
+    return new JavaBinCodecCoder<>(clazz);
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream) throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null SolrDocument");
+    }
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    JavaBinCodec codec = new JavaBinCodec();
+    codec.marshal(value, baos);
+
+    byte[] bytes = baos.toByteArray();
+    VarInt.encode(bytes.length, outStream);
+    outStream.write(bytes);
+  }
+
+  @Override
+  public T decode(InputStream inStream) throws IOException {
+    DataInputStream in = new DataInputStream(inStream);
+
+    int len = VarInt.decodeInt(in);
+    if (len < 0) {
+      throw new CoderException("Invalid encoded SolrDocument length: " + len);
+    }
+
+    JavaBinCodec codec = new JavaBinCodec();
+    return (T) codec.unmarshal(new BoundedInputStream(in, len));
+  }
+
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return TypeDescriptor.of(clazz);
+  }
+
+  @AutoService(CoderProviderRegistrar.class)
+  public static class Provider implements CoderProviderRegistrar {
+    @Override
+    public List<CoderProvider> getCoderProviders() {
+      return Arrays.asList(
+          CoderProviders.forCoder(
+              TypeDescriptor.of(SolrDocument.class), JavaBinCodecCoder.of(SolrDocument.class)),
+          CoderProviders.forCoder(
+              TypeDescriptor.of(SolrInputDocument.class),
+              JavaBinCodecCoder.of(SolrInputDocument.class)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
new file mode 100644
index 0000000..c137eea
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CursorMarkParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+
+/**
+ * Transforms for reading and writing data from/to Solr.
+ *
+ * <h3>Reading from Solr</h3>
+ *
+ * <p>{@link SolrIO#read SolrIO.read()} returns a bounded {@link PCollection
+ * PCollection&lt;SolrDocument&gt;} representing Solr documents.
+ *
+ * <p>To configure the {@link SolrIO#read}, you have to provide a connection configuration
+ * containing the Zookeeper address of the Solr cluster, and the collection name. The following
+ * example illustrates options for configuring the source:
+ *
+ * <pre>{@code
+ * SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration.create("127.0.0.1:9983");
+ * // Optionally: .withBasicCredentials(username, password)
+ *
+ * PCollection<SolrDocument> docs = p.apply(
+ *     SolrIO.read().from("my-collection").withConnectionConfiguration(conn));
+ *
+ * }</pre>
+ *
+ * <p>You can specify a query on the {@code read()} using {@code withQuery()}.
+ *
+ * <h3>Writing to Solr</h3>
+ *
+ * <p>To write documents to Solr, use {@link SolrIO#write SolrIO.write()}, which writes Solr
+ * documents from a {@link PCollection PCollection&lt;SolrInputDocument&gt;} (which can be bounded
+ * or unbounded).
+ *
+ * <p>To configure {@link SolrIO#write SolrIO.write()}, similar to the read, you have to provide a
+ * connection configuration, and a collection name. For instance:
+ *
+ * <pre>{@code
+ * PCollection<SolrInputDocument> inputDocs = ...;
+ * inputDocs.apply(SolrIO.write().to("my-collection").withConnectionConfiguration(conn));
+ *
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class SolrIO {
+
+  public static Read read() {
+    // 1000 for batch size is good enough in many cases,
+    // ex: if document size is large, around 10KB, the response's size will be around 10MB
+    // if document seize is small, around 1KB, the response's size will be around 1MB
+    return new AutoValue_SolrIO_Read.Builder().setBatchSize(1000).setQuery("*:*").build();
+  }
+
+  public static Write write() {
+    // 1000 for batch size is good enough in many cases,
+    // ex: if document size is large, around 10KB, the request's size will be around 10MB
+    // if document seize is small, around 1KB, the request's size will be around 1MB
+    return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build();
+  }
+
+  private SolrIO() {}
+
+  /** A POJO describing a connection configuration to Solr. */
+  @AutoValue
+  public abstract static class ConnectionConfiguration implements Serializable {
+
+    abstract String getZkHost();
+
+    @Nullable
+    abstract String getUsername();
+
+    @Nullable
+    abstract String getPassword();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setZkHost(String zkHost);
+
+      abstract Builder setUsername(String username);
+
+      abstract Builder setPassword(String password);
+
+      abstract ConnectionConfiguration build();
+    }
+
+    /**
+     * Creates a new Solr connection configuration.
+     *
+     * @param zkHost host of zookeeper
+     * @return the connection configuration object
+     */
+    public static ConnectionConfiguration create(String zkHost) {
+      checkArgument(zkHost != null, "zkHost can not be null");
+      return new AutoValue_SolrIO_ConnectionConfiguration.Builder().setZkHost(zkHost).build();
+    }
+
+    /** If Solr basic authentication is enabled, provide the username and password. */
+    public ConnectionConfiguration withBasicCredentials(String username, String password) {
+      checkArgument(username != null, "username can not be null");
+      checkArgument(!username.isEmpty(), "username can not be empty");
+      checkArgument(password != null, "password can not be null");
+      checkArgument(!password.isEmpty(), "password can not be empty");
+      return builder().setUsername(username).setPassword(password).build();
+    }
+
+    private void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("zkHost", getZkHost()));
+      builder.addIfNotNull(DisplayData.item("username", getUsername()));
+    }
+
+    private HttpClient createHttpClient() {
+      // This is bug in Solr, if we don't create a customize HttpClient,
+      // UpdateRequest with commit flag will throw an authentication error.
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(HttpClientUtil.PROP_BASIC_AUTH_USER, getUsername());
+      params.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, getPassword());
+      return HttpClientUtil.createClient(params);
+    }
+
+    AuthorizedSolrClient<CloudSolrClient> createClient() throws MalformedURLException {
+      CloudSolrClient solrClient = new CloudSolrClient(getZkHost(), createHttpClient());
+      return new AuthorizedSolrClient<>(solrClient, this);
+    }
+
+    AuthorizedSolrClient<HttpSolrClient> createClient(String shardUrl) {
+      HttpSolrClient solrClient = new HttpSolrClient(shardUrl, createHttpClient());
+      return new AuthorizedSolrClient<>(solrClient, this);
+    }
+  }
+
+  /** A {@link PTransform} reading data from Solr. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<SolrDocument>> {
+    private static final long MAX_BATCH_SIZE = 10000L;
+
+    @Nullable
+    abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @Nullable
+    abstract String getCollection();
+
+    abstract String getQuery();
+
+    abstract int getBatchSize();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setQuery(String query);
+
+      abstract Builder setBatchSize(int batchSize);
+
+      abstract Builder setCollection(String collection);
+
+      abstract Read build();
+    }
+
+    /** Provide the Solr connection configuration object. */
+    public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+      checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
+      return builder().setConnectionConfiguration(connectionConfiguration).build();
+    }
+
+    /**
+     * Provide name of collection while reading from Solr.
+     *
+     * @param collection the collection toward which the requests will be issued
+     */
+    public Read from(String collection) {
+      checkArgument(collection != null, "collection can not be null");
+      return builder().setCollection(collection).build();
+    }
+
+    /**
+     * Provide a query used while reading from Solr.
+     *
+     * @param query the query. See <a
+     *     href="https://cwiki.apache.org/confluence/display/solr/The+Standard+Query+Parser">Solr
+     *     Query </a>
+     */
+    public Read withQuery(String query) {
+      checkArgument(query != null, "query can not be null");
+      checkArgument(!query.isEmpty(), "query can not be empty");
+      return builder().setQuery(query).build();
+    }
+
+    /**
+     * Provide a size for the cursor read. See <a
+     * href="https://cwiki.apache.org/confluence/display/solr/Pagination+of+Results">cursor API</a>
+     * Default is 100. Maximum is 10 000. If documents are small, increasing batch size might
+     * improve read performance. If documents are big, you might need to decrease batchSize
+     *
+     * @param batchSize number of documents read in each scroll read
+     */
+    @VisibleForTesting
+    Read withBatchSize(int batchSize) {
+      // TODO remove this configuration, we can figure out the best number
+      // by tuning batchSize when pipelines run.
+      checkArgument(
+          batchSize > 0 && batchSize < MAX_BATCH_SIZE,
+          "Valid values for batchSize are 1 (inclusize) to %s (exclusive), but was: %s ",
+          MAX_BATCH_SIZE,
+          batchSize);
+      return builder().setBatchSize(batchSize).build();
+    }
+
+    @Override
+    public PCollection<SolrDocument> expand(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null)));
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      checkState(
+          getConnectionConfiguration() != null,
+          "Need to set connection configuration using withConnectionConfiguration()");
+      checkState(getCollection() != null, "Need to set collection name using to()");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.addIfNotNull(DisplayData.item("query", getQuery()));
+      getConnectionConfiguration().populateDisplayData(builder);
+    }
+  }
+
+  /** A POJO describing a replica of Solr. */
+  @AutoValue
+  abstract static class ReplicaInfo implements Serializable {
+    public abstract String coreName();
+
+    public abstract String coreUrl();
+
+    public abstract String baseUrl();
+
+    static ReplicaInfo create(Replica replica) {
+      return new AutoValue_SolrIO_ReplicaInfo(
+          replica.getStr(ZkStateReader.CORE_NAME_PROP),
+          replica.getCoreUrl(),
+          replica.getStr(ZkStateReader.BASE_URL_PROP));
+    }
+  }
+
+  /** A {@link BoundedSource} reading from Solr. */
+  @VisibleForTesting
+  static class BoundedSolrSource extends BoundedSource<SolrDocument> {
+
+    private final SolrIO.Read spec;
+    // replica is the info of the shard where the source will read the documents
+    @Nullable private final ReplicaInfo replica;
+
+    BoundedSolrSource(Read spec, @Nullable Replica replica) {
+      this.spec = spec;
+      this.replica = replica == null ? null : ReplicaInfo.create(replica);
+    }
+
+    @Override
+    public List<? extends BoundedSource<SolrDocument>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      ConnectionConfiguration connectionConfig = spec.getConnectionConfiguration();
+      List<BoundedSolrSource> sources = new ArrayList<>();
+      try (AuthorizedSolrClient<CloudSolrClient> client = connectionConfig.createClient()) {
+        String collection = spec.getCollection();
+        final ClusterState clusterState = AuthorizedSolrClient.getClusterState(client);
+        DocCollection docCollection = clusterState.getCollection(collection);
+        for (Slice slice : docCollection.getSlices()) {
+          ArrayList<Replica> replicas = new ArrayList<>(slice.getReplicas());
+          Collections.shuffle(replicas);
+          // Load balancing by randomly picking an active replica
+          Replica randomActiveReplica = null;
+          for (Replica replica : replicas) {
+            // We need to check both state of the replica and live nodes
+            // to make sure that the replica is alive
+            if (replica.getState() == Replica.State.ACTIVE
+                && clusterState.getLiveNodes().contains(replica.getNodeName())) {
+              randomActiveReplica = replica;
+              break;
+            }
+          }
+          // TODO in case of this replica goes inactive while the pipeline runs.
+          // We should pick another active replica of this shard.
+          checkState(
+              randomActiveReplica != null,
+              "Can not found an active replica for slice %s",
+              slice.getName());
+          sources.add(new BoundedSolrSource(spec, randomActiveReplica));
+        }
+      }
+      return sources;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+      if (replica != null) {
+        return getEstimatedSizeOfShard(replica);
+      } else {
+        return getEstimatedSizeOfCollection();
+      }
+    }
+
+    private long getEstimatedSizeOfShard(ReplicaInfo replica) throws IOException {
+      try (AuthorizedSolrClient solrClient =
+          spec.getConnectionConfiguration().createClient(replica.baseUrl())) {
+        CoreAdminRequest req = new CoreAdminRequest();
+        req.setAction(CoreAdminParams.CoreAdminAction.STATUS);
+        req.setIndexInfoNeeded(true);
+        CoreAdminResponse response;
+        try {
+          response = solrClient.process(req);
+        } catch (SolrServerException e) {
+          throw new IOException("Can not get core status from " + replica, e);
+        }
+        NamedList<Object> coreStatus = response.getCoreStatus(replica.coreName());
+        NamedList<Object> indexStats = (NamedList<Object>) coreStatus.get("index");
+        return (long) indexStats.get("sizeInBytes");
+      }
+    }
+
+    private long getEstimatedSizeOfCollection() throws IOException {
+      long sizeInBytes = 0;
+      ConnectionConfiguration config = spec.getConnectionConfiguration();
+      try (AuthorizedSolrClient<CloudSolrClient> solrClient = config.createClient()) {
+        DocCollection docCollection =
+            AuthorizedSolrClient.getClusterState(solrClient).getCollection(spec.getCollection());
+        if (docCollection.getSlices().isEmpty()) {
+          return 0;
+        }
+
+        ArrayList<Slice> slices = new ArrayList<>(docCollection.getSlices());
+        Collections.shuffle(slices);
+        ExecutorService executor =
+            Executors.newCachedThreadPool(
+                new ThreadFactoryBuilder()
+                    .setThreadFactory(MoreExecutors.platformThreadFactory())
+                    .setDaemon(true)
+                    .setNameFormat("solrio-size-of-collection-estimation")
+                    .build());
+        try {
+          ArrayList<Future<Long>> futures = new ArrayList<>();
+          for (int i = 0; i < 100 && i < slices.size(); i++) {
+            Slice slice = slices.get(i);
+            final Replica replica = slice.getLeader();
+            Future<Long> future =
+                executor.submit(
+                    new Callable<Long>() {
+                      @Override
+                      public Long call() throws Exception {
+                        return getEstimatedSizeOfShard(ReplicaInfo.create(replica));
+                      }
+                    });
+            futures.add(future);
+          }
+          for (Future<Long> future : futures) {
+            try {
+              sizeInBytes += future.get();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new IOException(e);
+            } catch (ExecutionException e) {
+              throw new IOException("Can not estimate size of shard", e.getCause());
+            }
+          }
+        } finally {
+          executor.shutdownNow();
+        }
+
+        if (slices.size() <= 100) {
+          return sizeInBytes;
+        }
+        return (sizeInBytes / 100) * slices.size();
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      spec.populateDisplayData(builder);
+      if (replica != null) {
+        builder.addIfNotNull(DisplayData.item("shardUrl", replica.coreUrl()));
+      }
+    }
+
+    @Override
+    public BoundedReader<SolrDocument> createReader(PipelineOptions options) throws IOException {
+      return new BoundedSolrReader(this);
+    }
+
+    @Override
+    public void validate() {
+      spec.validate(null);
+    }
+
+    @Override
+    public Coder<SolrDocument> getOutputCoder() {
+      return JavaBinCodecCoder.of(SolrDocument.class);
+    }
+  }
+
+  private static class BoundedSolrReader extends BoundedSource.BoundedReader<SolrDocument> {
+
+    private final BoundedSolrSource source;
+
+    private AuthorizedSolrClient solrClient;
+    private SolrDocument current;
+    private String cursorMark;
+    private Iterator<SolrDocument> batchIterator;
+    private boolean done;
+    private String uniqueKey;
+
+    private BoundedSolrReader(BoundedSolrSource source) {
+      this.source = source;
+      this.cursorMark = CursorMarkParams.CURSOR_MARK_START;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      if (source.replica != null) {
+        solrClient =
+            source.spec.getConnectionConfiguration().createClient(source.replica.baseUrl());
+      } else {
+        solrClient = source.spec.getConnectionConfiguration().createClient();
+      }
+      SchemaRequest.UniqueKey uniqueKeyRequest = new SchemaRequest.UniqueKey();
+      try {
+        String collection = source.spec.getCollection();
+        SchemaResponse.UniqueKeyResponse uniqueKeyResponse =
+            (SchemaResponse.UniqueKeyResponse) solrClient.process(collection, uniqueKeyRequest);
+        uniqueKey = uniqueKeyResponse.getUniqueKey();
+      } catch (SolrServerException e) {
+        throw new IOException("Can not get unique key from solr", e);
+      }
+      return advance();
+    }
+
+    private SolrQuery getQueryParams(BoundedSolrSource source) {
+      String query = source.spec.getQuery();
+      if (query == null) {
+        query = "*:*";
+      }
+      SolrQuery solrQuery = new SolrQuery(query);
+      solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+      solrQuery.setRows(source.spec.getBatchSize());
+      solrQuery.addSort(uniqueKey, SolrQuery.ORDER.asc);
+      if (source.replica != null) {
+        solrQuery.setDistrib(false);
+      }
+      return solrQuery;
+    }
+
+    private void updateCursorMark(QueryResponse response) {
+      if (cursorMark.equals(response.getNextCursorMark())) {
+        done = true;
+      }
+      cursorMark = response.getNextCursorMark();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (batchIterator != null && batchIterator.hasNext()) {
+        current = batchIterator.next();
+        return true;
+      } else {
+        SolrQuery solrQuery = getQueryParams(source);
+        try {
+          QueryResponse response;
+          if (source.replica != null) {
+            response = solrClient.query(source.replica.coreName(), solrQuery);
+          } else {
+            response = solrClient.query(source.spec.getCollection(), solrQuery);
+          }
+          updateCursorMark(response);
+          return readNextBatchAndReturnFirstDocument(response);
+        } catch (SolrServerException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+
+    private boolean readNextBatchAndReturnFirstDocument(QueryResponse response) {
+      if (done) {
+        current = null;
+        batchIterator = null;
+        return false;
+      }
+
+      batchIterator = response.getResults().iterator();
+      current = batchIterator.next();
+      return true;
+    }
+
+    @Override
+    public SolrDocument getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    public void close() throws IOException {
+      solrClient.close();
+    }
+
+    @Override
+    public BoundedSource<SolrDocument> getCurrentSource() {
+      return source;
+    }
+  }
+
+  /** A {@link PTransform} writing data to Solr. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<SolrInputDocument>, PDone> {
+
+    @Nullable
+    abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @Nullable
+    abstract String getCollection();
+
+    abstract int getMaxBatchSize();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setCollection(String collection);
+
+      abstract Builder setMaxBatchSize(int maxBatchSize);
+
+      abstract Write build();
+    }
+
+    /** Provide the Solr connection configuration object. */
+    public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
+      checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
+      return builder().setConnectionConfiguration(connectionConfiguration).build();
+    }
+
+    /**
+     * Provide name of collection while reading from Solr.
+     *
+     * @param collection the collection toward which the requests will be issued
+     */
+    public Write to(String collection) {
+      checkArgument(collection != null, "collection can not be null");
+      return builder().setCollection(collection).build();
+    }
+
+    /**
+     * Provide a maximum size in number of documents for the batch. Depending on the execution
+     * engine, size of bundles may vary, this sets the maximum size. Change this if you need to have
+     * smaller batch.
+     *
+     * @param batchSize maximum batch size in number of documents
+     */
+    @VisibleForTesting
+    Write withMaxBatchSize(int batchSize) {
+      // TODO remove this configuration, we can figure out the best number
+      // by tuning batchSize when pipelines run.
+      checkArgument(batchSize > 0, "batchSize must be larger than 0, but was: %s", batchSize);
+      return builder().setMaxBatchSize(batchSize).build();
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      checkState(
+          getConnectionConfiguration() != null,
+          "Need to set connection configuration via withConnectionConfiguration()");
+      checkState(getCollection() != null, "Need to set collection name via to()");
+    }
+
+    @Override
+    public PDone expand(PCollection<SolrInputDocument> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @VisibleForTesting
+    static class WriteFn extends DoFn<SolrInputDocument, Void> {
+
+      private final Write spec;
+
+      private transient AuthorizedSolrClient solrClient;
+      private Collection<SolrInputDocument> batch;
+
+      WriteFn(Write spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void createClient() throws Exception {
+        solrClient = spec.getConnectionConfiguration().createClient();
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext context) throws Exception {
+        batch = new ArrayList<>();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        SolrInputDocument document = context.element();
+        batch.add(document);
+        if (batch.size() >= spec.getMaxBatchSize()) {
+          flushBatch();
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle(FinishBundleContext context) throws Exception {
+        flushBatch();
+      }
+
+      private void flushBatch() throws IOException {
+        if (batch.isEmpty()) {
+          return;
+        }
+        try {
+          UpdateRequest updateRequest = new UpdateRequest();
+          updateRequest.add(batch);
+          solrClient.process(spec.getCollection(), updateRequest);
+        } catch (SolrServerException e) {
+          throw new IOException("Error writing to Solr", e);
+        } finally {
+          batch.clear();
+        }
+      }
+
+      @Teardown
+      public void closeClient() throws Exception {
+        if (solrClient != null) {
+          solrClient.close();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java
new file mode 100644
index 0000000..83867ed
--- /dev/null
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from/to Solr. */
+package org.apache.beam.sdk.io.solr;

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java
new file mode 100644
index 0000000..1fb435d
--- /dev/null
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.solr.common.SolrDocument;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test case for {@link JavaBinCodecCoder}. */
+@RunWith(JUnit4.class)
+public class JavaBinCodecCoderTest {
+  private static final Coder<SolrDocument> TEST_CODER = JavaBinCodecCoder.of(SolrDocument.class);
+  private static final List<SolrDocument> TEST_VALUES = new ArrayList<>();
+
+  static {
+    SolrDocument doc = new SolrDocument();
+    doc.put("id", "1");
+    doc.put("content", "wheel on the bus");
+    doc.put("_version_", 1573597324260671488L);
+    TEST_VALUES.add(doc);
+
+    doc = new SolrDocument();
+    doc.put("id", "2");
+    doc.put("content", "goes round and round");
+    doc.put("_version_", 1573597324260671489L);
+    TEST_VALUES.add(doc);
+  }
+
+  @Test
+  public void testDecodeEncodeEqual() throws Exception {
+    for (SolrDocument value : TEST_VALUES) {
+      CoderProperties.coderDecodeEncodeContentsInSameOrder(TEST_CODER, value);
+      CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value);
+    }
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void encodeNullThrowsCoderException() throws Exception {
+    thrown.expect(CoderException.class);
+    thrown.expectMessage("cannot encode a null SolrDocument");
+
+    CoderUtils.encodeToBase64(TEST_CODER, null);
+  }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(
+        TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(SolrDocument.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
new file mode 100644
index 0000000..4358ce4
--- /dev/null
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import com.google.common.io.BaseEncoding;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.security.Sha256AuthenticationProvider;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A test of {@link SolrIO} on an independent Solr instance. */
+@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE)
+@SolrTestCaseJ4.SuppressSSL
+public class SolrIOTest extends SolrCloudTestCase {
+  private static final Logger LOG = LoggerFactory.getLogger(SolrIOTest.class);
+
+  private static final String SOLR_COLLECTION = "beam";
+  private static final int NUM_SHARDS = 3;
+  private static final long NUM_DOCS = 400L;
+  private static final int NUM_SCIENTISTS = 10;
+  private static final int BATCH_SIZE = 200;
+
+  private static AuthorizedSolrClient<CloudSolrClient> solrClient;
+  private static SolrIO.ConnectionConfiguration connectionConfiguration;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // setup credential for solr user,
+    // See https://cwiki.apache.org/confluence/display/solr/Basic+Authentication+Plugin
+    String password = "SolrRocks";
+    // salt's size can be arbitrary
+    byte[] salt = new byte[random().nextInt(30) + 1];
+    random().nextBytes(salt);
+    String base64Salt = BaseEncoding.base64().encode(salt);
+    String sha56 = Sha256AuthenticationProvider.sha256(password, base64Salt);
+    String credential = sha56 + " " + base64Salt;
+    String securityJson =
+        "{"
+            + "'authentication':{"
+            + "  'blockUnknown': true,"
+            + "  'class':'solr.BasicAuthPlugin',"
+            + "  'credentials':{'solr':'"
+            + credential
+            + "'}}"
+            + "}";
+
+    configureCluster(3).addConfig("conf", getFile("cloud-minimal/conf").toPath()).configure();
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+    zkStateReader
+        .getZkClient()
+        .setData("/security.json", securityJson.getBytes(Charset.defaultCharset()), true);
+    String zkAddress = cluster.getZkServer().getZkAddress();
+    connectionConfiguration =
+        SolrIO.ConnectionConfiguration.create(zkAddress).withBasicCredentials("solr", password);
+    solrClient = connectionConfiguration.createClient();
+    SolrIOTestUtils.createCollection(SOLR_COLLECTION, NUM_SHARDS, 1, solrClient);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    solrClient.close();
+  }
+
+  @Before
+  public void before() throws Exception {
+    SolrIOTestUtils.clearCollection(SOLR_COLLECTION, solrClient);
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  public void testBadCredentials() throws IOException {
+    thrown.expect(SolrException.class);
+
+    String zkAddress = cluster.getZkServer().getZkAddress();
+    SolrIO.ConnectionConfiguration connectionConfiguration =
+        SolrIO.ConnectionConfiguration.create(zkAddress)
+            .withBasicCredentials("solr", "wrongpassword");
+    try (AuthorizedSolrClient solrClient = connectionConfiguration.createClient()) {
+      SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+    }
+  }
+
+  @Test
+  public void testSizes() throws Exception {
+    SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    SolrIO.Read read =
+        SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION);
+    SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null);
+    // can't use equal assert as Solr collections never have same size
+    // (due to internal Lucene implementation)
+    long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+    LOG.info("Estimated size: {}", estimatedSize);
+    assertThat(
+        "Wrong estimated size bellow minimum",
+        estimatedSize,
+        greaterThan(SolrIOTestUtils.MIN_DOC_SIZE * NUM_DOCS));
+    assertThat(
+        "Wrong estimated size beyond maximum",
+        estimatedSize,
+        lessThan(SolrIOTestUtils.MAX_DOC_SIZE * NUM_DOCS));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+    PCollection<SolrDocument> output =
+        pipeline.apply(
+            SolrIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .from(SOLR_COLLECTION)
+                .withBatchSize(101));
+    PAssert.thatSingleton(output.apply("Count", Count.<SolrDocument>globally()))
+        .isEqualTo(NUM_DOCS);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadWithQuery() throws Exception {
+    SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+    PCollection<SolrDocument> output =
+        pipeline.apply(
+            SolrIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .from(SOLR_COLLECTION)
+                .withQuery("scientist:Franklin"));
+    PAssert.thatSingleton(output.apply("Count", Count.<SolrDocument>globally()))
+        .isEqualTo(NUM_DOCS / NUM_SCIENTISTS);
+    pipeline.run();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    List<SolrInputDocument> data = SolrIOTestUtils.createDocuments(NUM_DOCS);
+    SolrIO.Write write =
+        SolrIO.write().withConnectionConfiguration(connectionConfiguration).to(SOLR_COLLECTION);
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    long currentNumDocs = SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient);
+    assertEquals(NUM_DOCS, currentNumDocs);
+
+    QueryResponse response = solrClient.query(SOLR_COLLECTION, new SolrQuery("scientist:Lovelace"));
+    assertEquals(NUM_DOCS / NUM_SCIENTISTS, response.getResults().getNumFound());
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSize() throws Exception {
+    SolrIO.Write write =
+        SolrIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .to(SOLR_COLLECTION)
+            .withMaxBatchSize(BATCH_SIZE);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    try (DoFnTester<SolrInputDocument, Void> fnTester =
+        DoFnTester.of(new SolrIO.Write.WriteFn(write))) {
+      List<SolrInputDocument> input = SolrIOTestUtils.createDocuments(NUM_DOCS);
+      long numDocsProcessed = 0;
+      long numDocsInserted = 0;
+      for (SolrInputDocument document : input) {
+        fnTester.processElement(document);
+        numDocsProcessed++;
+        // test every 100 docs to avoid overloading Solr
+        if ((numDocsProcessed % 100) == 0) {
+          // force the index to upgrade after inserting for the inserted docs
+          // to be searchable immediately
+          long currentNumDocs =
+              SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient);
+          if ((numDocsProcessed % BATCH_SIZE) == 0) {
+            /* bundle end */
+            assertEquals(
+                "we are at the end of a bundle, we should have inserted all processed documents",
+                numDocsProcessed,
+                currentNumDocs);
+            numDocsInserted = currentNumDocs;
+          } else {
+            /* not bundle end */
+            assertEquals(
+                "we are not at the end of a bundle, we should have inserted no more documents",
+                numDocsInserted,
+                currentNumDocs);
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSplit() throws Exception {
+    SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    SolrIO.Read read =
+        SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION);
+    SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null);
+    //desiredBundleSize is ignored for now
+    int desiredBundleSizeBytes = 0;
+    List<? extends BoundedSource<SolrDocument>> splits =
+        initialSource.split(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+
+    int expectedNumSplits = NUM_SHARDS;
+    assertEquals(expectedNumSplits, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<SolrDocument> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    // docs are hashed by id to shards, in this test, NUM_DOCS >> NUM_SHARDS
+    // therefore, can not exist an empty shard.
+    assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
new file mode 100644
index 0000000..808cd0f
--- /dev/null
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+
+/** Test utilities to use with {@link SolrIO}. */
+public class SolrIOTestUtils {
+  public static final long MIN_DOC_SIZE = 40L;
+  public static final long MAX_DOC_SIZE = 90L;
+
+  static void createCollection(
+      String collection, int numShards, int replicationFactor, AuthorizedSolrClient client)
+      throws Exception {
+    CollectionAdminRequest.Create create =
+        new CollectionAdminRequest.Create()
+            .setCollectionName(collection)
+            .setNumShards(numShards)
+            .setReplicationFactor(replicationFactor)
+            .setMaxShardsPerNode(2);
+    client.process(create);
+  }
+
+  /** Inserts the given number of test documents into Solr. */
+  static void insertTestDocuments(String collection, long numDocs, AuthorizedSolrClient client)
+      throws IOException {
+    List<SolrInputDocument> data = createDocuments(numDocs);
+    try {
+      UpdateRequest updateRequest = new UpdateRequest();
+      updateRequest.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+      updateRequest.add(data);
+      client.process(collection, updateRequest);
+    } catch (SolrServerException e) {
+      throw new IOException("Failed to insert test documents to collection", e);
+    }
+  }
+
+  /** Delete given collection. */
+  static void deleteCollection(String collection, AuthorizedSolrClient client) throws IOException {
+    try {
+      CollectionAdminRequest.Delete delete =
+          new CollectionAdminRequest.Delete().setCollectionName(collection);
+      client.process(delete);
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /** Clear given collection. */
+  static void clearCollection(String collection, AuthorizedSolrClient client) throws IOException {
+    try {
+      UpdateRequest updateRequest = new UpdateRequest();
+      updateRequest.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+      updateRequest.deleteByQuery("*:*");
+      client.process(collection, updateRequest);
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Forces a commit of the given collection to make recently inserted documents available for
+   * search.
+   *
+   * @return The number of docs in the index
+   */
+  static long commitAndGetCurrentNumDocs(String collection, AuthorizedSolrClient client)
+      throws IOException {
+    SolrQuery solrQuery = new SolrQuery("*:*");
+    solrQuery.setRows(0);
+    try {
+      UpdateRequest update = new UpdateRequest();
+      update.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+      client.process(collection, update);
+
+      return client.query(collection, new SolrQuery("*:*")).getResults().getNumFound();
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Generates a list of test documents for insertion.
+   *
+   * @return the list of json String representing the documents
+   */
+  static List<SolrInputDocument> createDocuments(long numDocs) {
+    String[] scientists = {
+      "Lovelace",
+      "Franklin",
+      "Meitner",
+      "Hopper",
+      "Curie",
+      "Faraday",
+      "Newton",
+      "Bohr",
+      "Galilei",
+      "Maxwell"
+    };
+    ArrayList<SolrInputDocument> data = new ArrayList<>();
+    for (int i = 0; i < numDocs; i++) {
+      int index = i % scientists.length;
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.setField("id", String.valueOf(i));
+      doc.setField("scientist", scientists[index]);
+      data.add(doc);
+    }
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..08a1716
--- /dev/null
+++ b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="true" stored="true"/>
+  <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+  <field name="id" type="string" indexed="true" stored="true"/>
+  <dynamicField name="*_s"  type="string"  indexed="true"  stored="true" />
+  <uniqueKey>id</uniqueKey>
+</schema>

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..8da7d28
--- /dev/null
+++ b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index e1adb79..1fb6e41 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -172,6 +172,11 @@
       <artifactId>beam-sdks-java-io-mqtt</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-solr</artifactId>
+    </dependency>
+
     <!-- provided and optional dependencies.-->
     <dependency>
       <groupId>com.google.auto.service</groupId>


[2/2] beam git commit: This closes #3618: [BEAM-2657] Create Solr IO

Posted by jk...@apache.org.
This closes #3618: [BEAM-2657] Create Solr IO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5714f22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5714f22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5714f22

Branch: refs/heads/master
Commit: f5714f2203974a0f97f778d62a82678025bd425e
Parents: 2fa4fde d00ff9e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 8 11:54:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 8 11:54:25 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   8 +-
 .../sdk/io/common/IOTestPipelineOptions.java    |   6 +
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/io/solr/pom.xml                       | 147 ++++
 .../beam/sdk/io/solr/AuthorizedSolrClient.java  |  91 +++
 .../beam/sdk/io/solr/JavaBinCodecCoder.java     |  98 +++
 .../org/apache/beam/sdk/io/solr/SolrIO.java     | 717 +++++++++++++++++++
 .../apache/beam/sdk/io/solr/package-info.java   |  20 +
 .../beam/sdk/io/solr/JavaBinCodecCoderTest.java |  81 +++
 .../org/apache/beam/sdk/io/solr/SolrIOTest.java | 269 +++++++
 .../beam/sdk/io/solr/SolrIOTestUtils.java       | 132 ++++
 .../resources/cloud-minimal/conf/schema.xml     |  29 +
 .../resources/cloud-minimal/conf/solrconfig.xml |  48 ++
 sdks/java/javadoc/pom.xml                       |   5 +
 14 files changed, 1651 insertions(+), 1 deletion(-)
----------------------------------------------------------------------