You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/01 16:38:09 UTC

[pulsar] branch master updated: [Issue 3884] [pulsar-io] Add a Pulsar IO connector for Solr sink (#3885)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef6d8d8  [Issue 3884] [pulsar-io] Add a Pulsar IO connector for Solr sink (#3885)
ef6d8d8 is described below

commit ef6d8d88de1e9fd320afb5da8d7e22c3f0a21922
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Apr 2 00:38:04 2019 +0800

    [Issue 3884] [pulsar-io] Add a Pulsar IO connector for Solr sink (#3885)
    
    * Add a Pulsar IO connector for Solr sink.
    
    * Remove empty lines and add a test scope in pom
    
    * Add license header in pom
---
 pulsar-io/pom.xml                                  |   1 +
 pulsar-io/solr/pom.xml                             |  86 +++++++++++
 .../apache/pulsar/io/solr/SolrAbstractSink.java    | 131 ++++++++++++++++
 .../pulsar/io/solr/SolrGenericRecordSink.java      |  53 +++++++
 .../org/apache/pulsar/io/solr/SolrSinkConfig.java  | 104 +++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 +++
 .../pulsar/io/solr/SolrGenericRecordSinkTest.java  | 109 +++++++++++++
 .../org/apache/pulsar/io/solr/SolrServerUtil.java  |  91 +++++++++++
 .../apache/pulsar/io/solr/SolrSinkConfigTest.java  | 168 +++++++++++++++++++++
 pulsar-io/solr/src/test/resources/sinkConfig.yaml  |  27 ++++
 pulsar-io/solr/src/test/resources/solr.xml         |  38 +++++
 site2/docs/io-connectors.md                        |   1 +
 site2/docs/io-solr.md                              |  21 +++
 13 files changed, 852 insertions(+)

diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index d45ba7b..5a987f7 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -54,6 +54,7 @@
     <module>mongo</module>
     <module>flume</module>
     <module>redis</module>
+    <module>solr</module>
   </modules>
 
 </project>
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
new file mode 100644
index 0000000..0c0be20
--- /dev/null
+++ b/pulsar-io/solr/pom.xml
@@ -0,0 +1,86 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>pulsar-io</artifactId>
+        <groupId>org.apache.pulsar</groupId>
+        <version>2.4.0-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+        <solr.version>7.5.0</solr.version>
+    </properties>
+
+    <artifactId>pulsar-io-solr</artifactId>
+    <name>Pulsar IO :: Solr</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-functions-instance</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-solrj</artifactId>
+            <version>${solr.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-core</artifactId>
+            <version>${solr.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.4</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>3.2.2</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
new file mode 100644
index 0000000..d7bcb12
--- /dev/null
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple abstract class for Solr sink
+ */
+@Slf4j
+public abstract class SolrAbstractSink<T> implements Sink<T> {
+
+    private SolrSinkConfig solrSinkConfig;
+    private SolrClient client;
+    private boolean enableBasicAuth;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        solrSinkConfig = SolrSinkConfig.load(config);
+        solrSinkConfig.validate();
+
+        enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
+
+        SolrMode solrMode;
+        try {
+            solrMode = SolrMode.valueOf(solrSinkConfig.getSolrMode().toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Illegal Solr mode, valid values are: "
+                + Arrays.asList(SolrMode.values()));
+        }
+
+        client = getClient(solrMode, solrSinkConfig.getSolrUrl());
+    }
+
+    @Override
+    public void write(Record<T> record) {
+        UpdateRequest updateRequest = new UpdateRequest();
+        if (solrSinkConfig.getSolrCommitWithinMs() > 0) {
+            updateRequest.setCommitWithin(solrSinkConfig.getSolrCommitWithinMs());
+        }
+        if (enableBasicAuth) {
+            updateRequest.setBasicAuthCredentials(
+                solrSinkConfig.getUsername(),
+                solrSinkConfig.getPassword()
+            );
+        }
+
+        SolrInputDocument document = convert(record);
+        updateRequest.add(document);
+
+        try {
+            UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection());
+            if (updateResponse.getStatus() == 0) {
+                record.ack();
+            } else {
+                record.fail();
+            }
+        } catch (SolrServerException | IOException e) {
+            record.fail();
+            log.warn("Solr update document exception ", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    // convert record as a Solr document
+    public abstract SolrInputDocument convert(Record<T> message);
+
+    public static SolrClient getClient(SolrMode solrMode, String url) {
+        SolrClient solrClient = null;
+        if (solrMode.equals(SolrMode.STANDALONE)) {
+            HttpSolrClient.Builder builder = new HttpSolrClient.Builder(url);
+            solrClient = builder.build();
+        }
+        if (solrMode.equals(SolrMode.SOLRCLOUD)) {
+            int chrootIndex = url.indexOf("/");
+            Optional<String> chroot = Optional.empty();
+            if (chrootIndex > 0) {
+                chroot = Optional.of(url.substring(chrootIndex));
+            }
+            String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
+            List<String> zkHosts = Arrays.asList(zkUrls.split(","));
+            CloudSolrClient.Builder builder = new CloudSolrClient.Builder(zkHosts, chroot);
+            solrClient = builder.build();
+        }
+        return solrClient;
+    }
+
+    public enum SolrMode {
+        STANDALONE,
+        SOLRCLOUD
+    }
+}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
new file mode 100644
index 0000000..e1df871
--- /dev/null
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.solr.common.SolrInputDocument;
+
+import java.util.List;
+
+/**
+ * A simple Solr sink, which interprets input Record in generic record.
+ */
+@Connector(
+    name = "solr",
+    type = IOType.SINK,
+    help = "The SolrGenericRecordSink is used for moving messages from Pulsar to Solr.",
+    configClass = SolrSinkConfig.class
+)
+@Slf4j
+public class SolrGenericRecordSink extends SolrAbstractSink<GenericRecord> {
+    @Override
+    public SolrInputDocument convert(Record<GenericRecord> message) {
+        SolrInputDocument doc = new SolrInputDocument();
+        GenericRecord record = message.getValue();
+        List<Field> fields = record.getFields();
+        for (Field field : fields) {
+            Object fieldValue = record.getField(field);
+            doc.setField(field.getName(), fieldValue);
+        }
+        return doc;
+    }
+}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
new file mode 100644
index 0000000..7176c58
--- /dev/null
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Configuration class for the Solr Sink Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class SolrSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = -4849066206354610110L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot)"
+            + " or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr)"
+    )
+    private String solrUrl;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "SolrCloud",
+        help = "The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]")
+    private String solrMode = "SolrCloud";
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "Solr collection name to which records need to be written")
+    private String solrCollection;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "10",
+        help = "Commit within milli seconds for solr update, if none passes defaults to 10 ms")
+    private int solrCommitWithinMs = 10;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The username to use for basic authentication")
+    private String username;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The password to use for basic authentication")
+    private String password;
+
+    public static SolrSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
+    }
+
+    public static SolrSinkConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), SolrSinkConfig.class);
+    }
+
+    public void validate() {
+        Preconditions.checkNotNull(solrUrl, "solrUrl property not set.");
+        Preconditions.checkNotNull(solrMode, "solrMode property not set.");
+        Preconditions.checkNotNull(solrCollection, "solrCollection property not set.");
+        Preconditions.checkArgument(solrCommitWithinMs > 0, "solrCommitWithinMs must be a positive integer.");
+    }
+}
diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..fdf223a
--- /dev/null
+++ b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: solr
+description: Writes data into solr collection
+sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
new file mode 100644
index 0000000..72462e7
--- /dev/null
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * solr Sink test
+ */
+@Slf4j
+public class SolrGenericRecordSinkTest {
+
+    private SolrServerUtil solrServerUtil;
+
+    /**
+     * A Simple class to test solr class
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        solrServerUtil = new SolrServerUtil(8983);
+        solrServerUtil.startStandaloneSolr();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        solrServerUtil.stopStandaloneSolr();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("solrUrl", "http://localhost:8983/solr");
+        configs.put("solrMode", "Standalone");
+        configs.put("solrCollection", "techproducts");
+        configs.put("solrCommitWithinMs", "100");
+        configs.put("username", "");
+        configs.put("password", "");
+
+        SolrGenericRecordSink sink = new SolrGenericRecordSink();
+
+        // prepare a foo Record
+        Foo obj = new Foo();
+        obj.setField1("FakeFiled1");
+        obj.setField2("FakeFiled1");
+        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+
+        byte[] bytes = schema.encode(obj);
+        ByteBuf payload = Unpooled.copiedBuffer(bytes);
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
+
+        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
+        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
+            .message(message)
+            .topicName("fake_topic_name")
+            .build();
+
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+            obj.toString(),
+            message.getValue().toString(),
+            record.getValue().toString());
+
+        // open should success
+        sink.open(configs, null);
+    }
+}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
new file mode 100644
index 0000000..033d0d2
--- /dev/null
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+
+import java.io.File;
+
+@Slf4j
+public class SolrServerUtil {
+    private JettySolrRunner standaloneSolr;
+    private int port;
+
+    public SolrServerUtil(int port) {
+        this.port = port;
+    }
+
+    public void startStandaloneSolr() throws Exception {
+        if (standaloneSolr != null) {
+            throw new IllegalStateException("Test is already running a standalone Solr instance " +
+                standaloneSolr.getBaseUrl() + "! This indicates a bug in the unit test logic.");
+        }
+
+        File solrHomeDir = new File(FileUtils.getTempDirectory().getPath() + "/solr_home");
+        String solrXml = "solr.xml";
+        FileUtils.copyFile(getFile(solrXml), new File(solrHomeDir.getAbsolutePath() + "/" + solrXml));
+        File solrLogDir = new File(solrHomeDir.getPath() + "/solr_logs");
+
+        createTempDir(solrHomeDir);
+        createTempDir(solrLogDir);
+
+        System.setProperty("host", "localhost");
+        System.setProperty("jetty.port", String.valueOf(port));
+        System.setProperty("solr.log.dir", solrLogDir.getAbsolutePath());
+
+        standaloneSolr = new JettySolrRunner(solrHomeDir.getAbsolutePath(), "/solr", port);
+        Thread bg = new Thread() {
+            public void run() {
+                try {
+                    standaloneSolr.start();
+                } catch (Exception e) {
+                    if (e instanceof RuntimeException) {
+                        throw (RuntimeException)e;
+                    } else {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        };
+        bg.start();
+    }
+
+    public void stopStandaloneSolr() {
+        if (standaloneSolr != null) {
+            try {
+                standaloneSolr.stop();
+            } catch (Exception e) {
+                log.error("Failed to stop standalone solr.");
+            }
+        }
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+
+    private void createTempDir(File file) {
+        if (!file.exists() && !file.isDirectory()) {
+            file.mkdirs();
+        }
+    }
+}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
new file mode 100644
index 0000000..493542f
--- /dev/null
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import com.google.common.collect.Lists;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * SolrSinkConfig test
+ */
+public class SolrSinkConfigTest {
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        SolrSinkConfig config = SolrSinkConfig.load(path);
+        assertNotNull(config);
+        assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
+        assertEquals("SolrCloud", config.getSolrMode());
+        assertEquals("techproducts", config.getSolrCollection());
+        assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
+        assertEquals("fakeuser", config.getUsername());
+        assertEquals("fake@123", config.getPassword());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+        map.put("solrMode", "SolrCloud");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "100");
+        map.put("username", "fakeuser");
+        map.put("password", "fake@123");
+
+        SolrSinkConfig config = SolrSinkConfig.load(map);
+        assertNotNull(config);
+        assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
+        assertEquals("SolrCloud", config.getSolrMode());
+        assertEquals("techproducts", config.getSolrCollection());
+        assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
+        assertEquals("fakeuser", config.getUsername());
+        assertEquals("fake@123", config.getPassword());
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+        map.put("solrMode", "SolrCloud");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "100");
+        map.put("username", "fakeuser");
+        map.put("password", "fake@123");
+
+        SolrSinkConfig config = SolrSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+        expectedExceptionsMessageRegExp = "solrUrl property not set.")
+    public final void missingValidValidateSolrModeTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrMode", "SolrCloud");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "100");
+        map.put("username", "fakeuser");
+        map.put("password", "fake@123");
+
+        SolrSinkConfig config = SolrSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "solrCommitWithinMs must be a positive integer.")
+    public final void invalidBatchTimeMsTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+        map.put("solrMode", "SolrCloud");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "-100");
+        map.put("username", "fakeuser");
+        map.put("password", "fake@123");
+
+        SolrSinkConfig config = SolrSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.solr.SolrAbstractSink.SolrMode.NOTSUPPORT")
+    public final void invalidClientModeTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+        map.put("solrMode", "NotSupport");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "100");
+        map.put("username", "fakeuser");
+        map.put("password", "fake@123");
+
+        SolrSinkConfig config = SolrSinkConfig.load(map);
+        config.validate();
+
+        SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase());
+    }
+
+    @Test
+    public final void validZkChrootTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+        map.put("solrMode", "SolrCloud");
+        map.put("solrCollection", "techproducts");
+        map.put("solrCommitWithinMs", "100");
+        map.put("username", "fakeuser");
+        map.put("password", "fake@123");
+
+        SolrSinkConfig config = SolrSinkConfig.load(map);
+        config.validate();
+
+        String url = config.getSolrUrl();
+        int chrootIndex = url.indexOf("/");
+        Optional<String> chroot = Optional.empty();
+        if (chrootIndex > 0) {
+            chroot = Optional.of(url.substring(chrootIndex));
+        }
+        String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
+        List<String> zkHosts = Arrays.asList(zkUrls.split(","));
+
+        List<String> expectedZkHosts = Lists.newArrayList();
+        expectedZkHosts.add("localhost:2181");
+        expectedZkHosts.add("localhost:2182");
+
+        assertEquals("/chroot", chroot.get());
+        assertEquals(expectedZkHosts, zkHosts);
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/solr/src/test/resources/sinkConfig.yaml b/pulsar-io/solr/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..d96b353
--- /dev/null
+++ b/pulsar-io/solr/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"solrUrl": "localhost:2181,localhost:2182/chroot",
+"solrMode": "SolrCloud",
+"solrCollection": "techproducts",
+"solrCommitWithinMs": "100",
+"username": "fakeuser",
+"password": "fake@123"
+}
diff --git a/pulsar-io/solr/src/test/resources/solr.xml b/pulsar-io/solr/src/test/resources/solr.xml
new file mode 100644
index 0000000..495a71f
--- /dev/null
+++ b/pulsar-io/solr/src/test/resources/solr.xml
@@ -0,0 +1,38 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<solr>
+    <solrcloud>
+        <str name="host">${host:}</str>
+        <int name="hostPort">${jetty.port:8983}</int>
+        <str name="hostContext">${hostContext:solr}</str>
+        <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
+        <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
+        <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:600000}</int>
+        <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
+        <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
+        <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
+    </solrcloud>
+    <shardHandlerFactory name="shardHandlerFactory"
+                         class="HttpShardHandlerFactory">
+        <int name="socketTimeout">${socketTimeout:600000}</int>
+        <int name="connTimeout">${connTimeout:60000}</int>
+    </shardHandlerFactory>
+</solr>
\ No newline at end of file
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 2f166ac..6f94d3c 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -23,3 +23,4 @@ Pulsar Functions cluster.
 - [File Source Connector](io-file.md#source)
 - [Hdfs Sink Connector](io-hdfs.md#sink)
 - [MongoDB Sink Connector](io-mongo.md#sink)
+- [Solr Sink Connector](io-solr.md#sink)
diff --git a/site2/docs/io-solr.md b/site2/docs/io-solr.md
new file mode 100644
index 0000000..7fdcf36
--- /dev/null
+++ b/site2/docs/io-solr.md
@@ -0,0 +1,21 @@
+---
+id: io-solr
+title: solr Connector
+sidebar_label: solr Connector
+---
+
+## Sink
+
+The solr Sink Connector is used to pull messages from Pulsar topics and persist the messages
+to a solr collection.
+
+## Sink Configuration Options
+
+| Name | Default | Required | Description |
+|------|---------|----------|-------------|
+| `solrUrl` | `null` | `true` | Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot) or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr). |
+| `solrMode` | `SolrCloud` | `true` | The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]. |
+| `solrCollection` | `null` | `true` | Solr collection name to which records need to be written. |
+| `solrCommitWithinMs` | `10` | `false` | Commit within milli seconds for solr update, if none passes defaults to 10 ms. |
+| `username` | `null` | `false` | The username to use for basic authentication. |
+| `password` | `null` | `false` | The password to use for basic authentication. |
\ No newline at end of file