You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/01 16:38:09 UTC
[pulsar] branch master updated: [Issue 3884] [pulsar-io] Add a
Pulsar IO connector for Solr sink (#3885)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef6d8d8 [Issue 3884] [pulsar-io] Add a Pulsar IO connector for Solr sink (#3885)
ef6d8d8 is described below
commit ef6d8d88de1e9fd320afb5da8d7e22c3f0a21922
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Apr 2 00:38:04 2019 +0800
[Issue 3884] [pulsar-io] Add a Pulsar IO connector for Solr sink (#3885)
* Add a Pulsar IO connector for Solr sink.
* Remove empty lines and add a test scope in pom
* Add license header in pom
---
pulsar-io/pom.xml | 1 +
pulsar-io/solr/pom.xml | 86 +++++++++++
.../apache/pulsar/io/solr/SolrAbstractSink.java | 131 ++++++++++++++++
.../pulsar/io/solr/SolrGenericRecordSink.java | 53 +++++++
.../org/apache/pulsar/io/solr/SolrSinkConfig.java | 104 +++++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 +++
.../pulsar/io/solr/SolrGenericRecordSinkTest.java | 109 +++++++++++++
.../org/apache/pulsar/io/solr/SolrServerUtil.java | 91 +++++++++++
.../apache/pulsar/io/solr/SolrSinkConfigTest.java | 168 +++++++++++++++++++++
pulsar-io/solr/src/test/resources/sinkConfig.yaml | 27 ++++
pulsar-io/solr/src/test/resources/solr.xml | 38 +++++
site2/docs/io-connectors.md | 1 +
site2/docs/io-solr.md | 21 +++
13 files changed, 852 insertions(+)
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index d45ba7b..5a987f7 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -54,6 +54,7 @@
<module>mongo</module>
<module>flume</module>
<module>redis</module>
+ <module>solr</module>
</modules>
</project>
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
new file mode 100644
index 0000000..0c0be20
--- /dev/null
+++ b/pulsar-io/solr/pom.xml
@@ -0,0 +1,86 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pulsar-io</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <solr.version>7.5.0</solr.version>
+ </properties>
+
+ <artifactId>pulsar-io-solr</artifactId>
+ <name>Pulsar IO :: Solr</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solr.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-core</artifactId>
+ <version>${solr.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.4</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
new file mode 100644
index 0000000..d7bcb12
--- /dev/null
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple abstract class for Solr sink
+ */
+@Slf4j
+public abstract class SolrAbstractSink<T> implements Sink<T> {
+
+ private SolrSinkConfig solrSinkConfig;
+ private SolrClient client;
+ private boolean enableBasicAuth;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ solrSinkConfig = SolrSinkConfig.load(config);
+ solrSinkConfig.validate();
+
+ enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
+
+ SolrMode solrMode;
+ try {
+ solrMode = SolrMode.valueOf(solrSinkConfig.getSolrMode().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal Solr mode, valid values are: "
+ + Arrays.asList(SolrMode.values()));
+ }
+
+ client = getClient(solrMode, solrSinkConfig.getSolrUrl());
+ }
+
+ @Override
+ public void write(Record<T> record) {
+ UpdateRequest updateRequest = new UpdateRequest();
+ if (solrSinkConfig.getSolrCommitWithinMs() > 0) {
+ updateRequest.setCommitWithin(solrSinkConfig.getSolrCommitWithinMs());
+ }
+ if (enableBasicAuth) {
+ updateRequest.setBasicAuthCredentials(
+ solrSinkConfig.getUsername(),
+ solrSinkConfig.getPassword()
+ );
+ }
+
+ SolrInputDocument document = convert(record);
+ updateRequest.add(document);
+
+ try {
+ UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection());
+ if (updateResponse.getStatus() == 0) {
+ record.ack();
+ } else {
+ record.fail();
+ }
+ } catch (SolrServerException | IOException e) {
+ record.fail();
+ log.warn("Solr update document exception ", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ // convert record as a Solr document
+ public abstract SolrInputDocument convert(Record<T> message);
+
+ public static SolrClient getClient(SolrMode solrMode, String url) {
+ SolrClient solrClient = null;
+ if (solrMode.equals(SolrMode.STANDALONE)) {
+ HttpSolrClient.Builder builder = new HttpSolrClient.Builder(url);
+ solrClient = builder.build();
+ }
+ if (solrMode.equals(SolrMode.SOLRCLOUD)) {
+ int chrootIndex = url.indexOf("/");
+ Optional<String> chroot = Optional.empty();
+ if (chrootIndex > 0) {
+ chroot = Optional.of(url.substring(chrootIndex));
+ }
+ String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
+ List<String> zkHosts = Arrays.asList(zkUrls.split(","));
+ CloudSolrClient.Builder builder = new CloudSolrClient.Builder(zkHosts, chroot);
+ solrClient = builder.build();
+ }
+ return solrClient;
+ }
+
+ public enum SolrMode {
+ STANDALONE,
+ SOLRCLOUD
+ }
+}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
new file mode 100644
index 0000000..e1df871
--- /dev/null
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.solr.common.SolrInputDocument;
+
+import java.util.List;
+
+/**
+ * A simple Solr sink, which interprets input Record in generic record.
+ */
+@Connector(
+ name = "solr",
+ type = IOType.SINK,
+ help = "The SolrGenericRecordSink is used for moving messages from Pulsar to Solr.",
+ configClass = SolrSinkConfig.class
+)
+@Slf4j
+public class SolrGenericRecordSink extends SolrAbstractSink<GenericRecord> {
+ @Override
+ public SolrInputDocument convert(Record<GenericRecord> message) {
+ SolrInputDocument doc = new SolrInputDocument();
+ GenericRecord record = message.getValue();
+ List<Field> fields = record.getFields();
+ for (Field field : fields) {
+ Object fieldValue = record.getField(field);
+ doc.setField(field.getName(), fieldValue);
+ }
+ return doc;
+ }
+}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
new file mode 100644
index 0000000..7176c58
--- /dev/null
+++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Configuration class for the Solr Sink Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class SolrSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = -4849066206354610110L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot)"
+ + " or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr)"
+ )
+ private String solrUrl;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "SolrCloud",
+ help = "The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]")
+ private String solrMode = "SolrCloud";
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "Solr collection name to which records need to be written")
+ private String solrCollection;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "10",
+ help = "Commit within milli seconds for solr update, if none passes defaults to 10 ms")
+ private int solrCommitWithinMs = 10;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The username to use for basic authentication")
+ private String username;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The password to use for basic authentication")
+ private String password;
+
+ public static SolrSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
+ }
+
+ public static SolrSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), SolrSinkConfig.class);
+ }
+
+ public void validate() {
+ Preconditions.checkNotNull(solrUrl, "solrUrl property not set.");
+ Preconditions.checkNotNull(solrMode, "solrMode property not set.");
+ Preconditions.checkNotNull(solrCollection, "solrCollection property not set.");
+ Preconditions.checkArgument(solrCommitWithinMs > 0, "solrCommitWithinMs must be a positive integer.");
+ }
+}
diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..fdf223a
--- /dev/null
+++ b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: solr
+description: Writes data into solr collection
+sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
new file mode 100644
index 0000000..72462e7
--- /dev/null
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * solr Sink test
+ */
+@Slf4j
+public class SolrGenericRecordSinkTest {
+
+ private SolrServerUtil solrServerUtil;
+
+ /**
+ * A Simple class to test solr class
+ */
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ private String field1;
+ private String field2;
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ solrServerUtil = new SolrServerUtil(8983);
+ solrServerUtil.startStandaloneSolr();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ solrServerUtil.stopStandaloneSolr();
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put("solrUrl", "http://localhost:8983/solr");
+ configs.put("solrMode", "Standalone");
+ configs.put("solrCollection", "techproducts");
+ configs.put("solrCommitWithinMs", "100");
+ configs.put("username", "");
+ configs.put("password", "");
+
+ SolrGenericRecordSink sink = new SolrGenericRecordSink();
+
+ // prepare a foo Record
+ Foo obj = new Foo();
+ obj.setField1("FakeFiled1");
+ obj.setField2("FakeFiled1");
+ AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+
+ byte[] bytes = schema.encode(obj);
+ ByteBuf payload = Unpooled.copiedBuffer(bytes);
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
+
+ Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
+ Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
+ .message(message)
+ .topicName("fake_topic_name")
+ .build();
+
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ obj.toString(),
+ message.getValue().toString(),
+ record.getValue().toString());
+
+ // open should success
+ sink.open(configs, null);
+ }
+}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
new file mode 100644
index 0000000..033d0d2
--- /dev/null
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+
+import java.io.File;
+
+@Slf4j
+public class SolrServerUtil {
+ private JettySolrRunner standaloneSolr;
+ private int port;
+
+ public SolrServerUtil(int port) {
+ this.port = port;
+ }
+
+ public void startStandaloneSolr() throws Exception {
+ if (standaloneSolr != null) {
+ throw new IllegalStateException("Test is already running a standalone Solr instance " +
+ standaloneSolr.getBaseUrl() + "! This indicates a bug in the unit test logic.");
+ }
+
+ File solrHomeDir = new File(FileUtils.getTempDirectory().getPath() + "/solr_home");
+ String solrXml = "solr.xml";
+ FileUtils.copyFile(getFile(solrXml), new File(solrHomeDir.getAbsolutePath() + "/" + solrXml));
+ File solrLogDir = new File(solrHomeDir.getPath() + "/solr_logs");
+
+ createTempDir(solrHomeDir);
+ createTempDir(solrLogDir);
+
+ System.setProperty("host", "localhost");
+ System.setProperty("jetty.port", String.valueOf(port));
+ System.setProperty("solr.log.dir", solrLogDir.getAbsolutePath());
+
+ standaloneSolr = new JettySolrRunner(solrHomeDir.getAbsolutePath(), "/solr", port);
+ Thread bg = new Thread() {
+ public void run() {
+ try {
+ standaloneSolr.start();
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ };
+ bg.start();
+ }
+
+ public void stopStandaloneSolr() {
+ if (standaloneSolr != null) {
+ try {
+ standaloneSolr.stop();
+ } catch (Exception e) {
+ log.error("Failed to stop standalone solr.");
+ }
+ }
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+
+ private void createTempDir(File file) {
+ if (!file.exists() && !file.isDirectory()) {
+ file.mkdirs();
+ }
+ }
+}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
new file mode 100644
index 0000000..493542f
--- /dev/null
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.solr;
+
+import com.google.common.collect.Lists;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * SolrSinkConfig test
+ */
+public class SolrSinkConfigTest {
+
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ SolrSinkConfig config = SolrSinkConfig.load(path);
+ assertNotNull(config);
+ assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
+ assertEquals("SolrCloud", config.getSolrMode());
+ assertEquals("techproducts", config.getSolrCollection());
+ assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
+ assertEquals("fakeuser", config.getUsername());
+ assertEquals("fake@123", config.getPassword());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+ map.put("username", "fakeuser");
+ map.put("password", "fake@123");
+
+ SolrSinkConfig config = SolrSinkConfig.load(map);
+ assertNotNull(config);
+ assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
+ assertEquals("SolrCloud", config.getSolrMode());
+ assertEquals("techproducts", config.getSolrCollection());
+ assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
+ assertEquals("fakeuser", config.getUsername());
+ assertEquals("fake@123", config.getPassword());
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+ map.put("username", "fakeuser");
+ map.put("password", "fake@123");
+
+ SolrSinkConfig config = SolrSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
+ expectedExceptionsMessageRegExp = "solrUrl property not set.")
+ public final void missingValidValidateSolrModeTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+ map.put("username", "fakeuser");
+ map.put("password", "fake@123");
+
+ SolrSinkConfig config = SolrSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "solrCommitWithinMs must be a positive integer.")
+ public final void invalidBatchTimeMsTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "-100");
+ map.put("username", "fakeuser");
+ map.put("password", "fake@123");
+
+ SolrSinkConfig config = SolrSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.solr.SolrAbstractSink.SolrMode.NOTSUPPORT")
+ public final void invalidClientModeTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "NotSupport");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+ map.put("username", "fakeuser");
+ map.put("password", "fake@123");
+
+ SolrSinkConfig config = SolrSinkConfig.load(map);
+ config.validate();
+
+ SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase());
+ }
+
+ @Test
+ public final void validZkChrootTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
+ map.put("solrMode", "SolrCloud");
+ map.put("solrCollection", "techproducts");
+ map.put("solrCommitWithinMs", "100");
+ map.put("username", "fakeuser");
+ map.put("password", "fake@123");
+
+ SolrSinkConfig config = SolrSinkConfig.load(map);
+ config.validate();
+
+ String url = config.getSolrUrl();
+ int chrootIndex = url.indexOf("/");
+ Optional<String> chroot = Optional.empty();
+ if (chrootIndex > 0) {
+ chroot = Optional.of(url.substring(chrootIndex));
+ }
+ String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
+ List<String> zkHosts = Arrays.asList(zkUrls.split(","));
+
+ List<String> expectedZkHosts = Lists.newArrayList();
+ expectedZkHosts.add("localhost:2181");
+ expectedZkHosts.add("localhost:2182");
+
+ assertEquals("/chroot", chroot.get());
+ assertEquals(expectedZkHosts, zkHosts);
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/solr/src/test/resources/sinkConfig.yaml b/pulsar-io/solr/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..d96b353
--- /dev/null
+++ b/pulsar-io/solr/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"solrUrl": "localhost:2181,localhost:2182/chroot",
+"solrMode": "SolrCloud",
+"solrCollection": "techproducts",
+"solrCommitWithinMs": "100",
+"username": "fakeuser",
+"password": "fake@123"
+}
diff --git a/pulsar-io/solr/src/test/resources/solr.xml b/pulsar-io/solr/src/test/resources/solr.xml
new file mode 100644
index 0000000..495a71f
--- /dev/null
+++ b/pulsar-io/solr/src/test/resources/solr.xml
@@ -0,0 +1,38 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<solr>
+ <solrcloud>
+ <str name="host">${host:}</str>
+ <int name="hostPort">${jetty.port:8983}</int>
+ <str name="hostContext">${hostContext:solr}</str>
+ <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
+ <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
+ <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:600000}</int>
+ <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
+ <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
+ <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
+ </solrcloud>
+ <shardHandlerFactory name="shardHandlerFactory"
+ class="HttpShardHandlerFactory">
+ <int name="socketTimeout">${socketTimeout:600000}</int>
+ <int name="connTimeout">${connTimeout:60000}</int>
+ </shardHandlerFactory>
+</solr>
\ No newline at end of file
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 2f166ac..6f94d3c 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -23,3 +23,4 @@ Pulsar Functions cluster.
- [File Source Connector](io-file.md#source)
- [Hdfs Sink Connector](io-hdfs.md#sink)
- [MongoDB Sink Connector](io-mongo.md#sink)
+- [Solr Sink Connector](io-solr.md#sink)
diff --git a/site2/docs/io-solr.md b/site2/docs/io-solr.md
new file mode 100644
index 0000000..7fdcf36
--- /dev/null
+++ b/site2/docs/io-solr.md
@@ -0,0 +1,21 @@
+---
+id: io-solr
+title: solr Connector
+sidebar_label: solr Connector
+---
+
+## Sink
+
+The solr Sink Connector is used to pull messages from Pulsar topics and persist the messages
+to a solr collection.
+
+## Sink Configuration Options
+
+| Name | Default | Required | Description |
+|------|---------|----------|-------------|
+| `solrUrl` | `null` | `true` | Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot) or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr). |
+| `solrMode` | `SolrCloud` | `true` | The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]. |
+| `solrCollection` | `null` | `true` | Solr collection name to which records need to be written. |
+| `solrCommitWithinMs` | `10` | `false` | Commit within milli seconds for solr update, if none passes defaults to 10 ms. |
+| `username` | `null` | `false` | The username to use for basic authentication. |
+| `password` | `null` | `false` | The password to use for basic authentication. |
\ No newline at end of file