You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/14 12:18:08 UTC
[pulsar] 04/14: Remove Solr
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_rootless
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 53120ffa6799fc4b9883572b8f8d286856db0244
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu May 13 14:26:10 2021 +0200
Remove Solr
---
pulsar-io/pom.xml | 1 -
pulsar-io/solr/pom.xml | 86 -----------
.../apache/pulsar/io/solr/SolrAbstractSink.java | 131 ----------------
.../pulsar/io/solr/SolrGenericRecordSink.java | 53 -------
.../org/apache/pulsar/io/solr/SolrSinkConfig.java | 98 ------------
.../resources/META-INF/services/pulsar-io.yaml | 23 ---
.../pulsar/io/solr/SolrGenericRecordSinkTest.java | 114 --------------
.../org/apache/pulsar/io/solr/SolrServerUtil.java | 91 -----------
.../apache/pulsar/io/solr/SolrSinkConfigTest.java | 168 ---------------------
pulsar-io/solr/src/test/resources/sinkConfig.yaml | 27 ----
pulsar-io/solr/src/test/resources/solr.xml | 38 -----
11 files changed, 830 deletions(-)
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 40b476a..a3911c0 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -64,7 +64,6 @@
<module>mongo</module>
<module>flume</module>
<module>redis</module>
- <module>solr</module>
<module>influxdb</module>
<module>dynamodb</module>
<module>nsq</module>
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
deleted file mode 100644
index a8d6103..0000000
--- a/pulsar-io/solr/pom.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>pulsar-io</artifactId>
- <groupId>org.apache.pulsar</groupId>
- <version>2.7.2.1.0.0</version>
- </parent>
-
- <properties>
- <solr.version>8.6.3</solr.version>
- </properties>
-
- <artifactId>pulsar-io-solr</artifactId>
- <name>Pulsar IO :: Solr</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>pulsar-io-core</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-functions-instance</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.solr</groupId>
- <artifactId>solr-solrj</artifactId>
- <version>${solr.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.solr</groupId>
- <artifactId>solr-core</artifactId>
- <version>${solr.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.4</version>
- </dependency>
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <version>3.2.2</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
\ No newline at end of file
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
deleted file mode 100644
index d7bcb12..0000000
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import com.google.common.base.Strings;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrInputDocument;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A simple abstract class for Solr sink
- */
-@Slf4j
-public abstract class SolrAbstractSink<T> implements Sink<T> {
-
- private SolrSinkConfig solrSinkConfig;
- private SolrClient client;
- private boolean enableBasicAuth;
-
- @Override
- public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
- solrSinkConfig = SolrSinkConfig.load(config);
- solrSinkConfig.validate();
-
- enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
-
- SolrMode solrMode;
- try {
- solrMode = SolrMode.valueOf(solrSinkConfig.getSolrMode().toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Illegal Solr mode, valid values are: "
- + Arrays.asList(SolrMode.values()));
- }
-
- client = getClient(solrMode, solrSinkConfig.getSolrUrl());
- }
-
- @Override
- public void write(Record<T> record) {
- UpdateRequest updateRequest = new UpdateRequest();
- if (solrSinkConfig.getSolrCommitWithinMs() > 0) {
- updateRequest.setCommitWithin(solrSinkConfig.getSolrCommitWithinMs());
- }
- if (enableBasicAuth) {
- updateRequest.setBasicAuthCredentials(
- solrSinkConfig.getUsername(),
- solrSinkConfig.getPassword()
- );
- }
-
- SolrInputDocument document = convert(record);
- updateRequest.add(document);
-
- try {
- UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection());
- if (updateResponse.getStatus() == 0) {
- record.ack();
- } else {
- record.fail();
- }
- } catch (SolrServerException | IOException e) {
- record.fail();
- log.warn("Solr update document exception ", e);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (client != null) {
- client.close();
- }
- }
-
- // convert record as a Solr document
- public abstract SolrInputDocument convert(Record<T> message);
-
- public static SolrClient getClient(SolrMode solrMode, String url) {
- SolrClient solrClient = null;
- if (solrMode.equals(SolrMode.STANDALONE)) {
- HttpSolrClient.Builder builder = new HttpSolrClient.Builder(url);
- solrClient = builder.build();
- }
- if (solrMode.equals(SolrMode.SOLRCLOUD)) {
- int chrootIndex = url.indexOf("/");
- Optional<String> chroot = Optional.empty();
- if (chrootIndex > 0) {
- chroot = Optional.of(url.substring(chrootIndex));
- }
- String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
- List<String> zkHosts = Arrays.asList(zkUrls.split(","));
- CloudSolrClient.Builder builder = new CloudSolrClient.Builder(zkHosts, chroot);
- solrClient = builder.build();
- }
- return solrClient;
- }
-
- public enum SolrMode {
- STANDALONE,
- SOLRCLOUD
- }
-}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
deleted file mode 100644
index e1df871..0000000
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.solr.common.SolrInputDocument;
-
-import java.util.List;
-
-/**
- * A simple Solr sink, which interprets input Record in generic record.
- */
-@Connector(
- name = "solr",
- type = IOType.SINK,
- help = "The SolrGenericRecordSink is used for moving messages from Pulsar to Solr.",
- configClass = SolrSinkConfig.class
-)
-@Slf4j
-public class SolrGenericRecordSink extends SolrAbstractSink<GenericRecord> {
- @Override
- public SolrInputDocument convert(Record<GenericRecord> message) {
- SolrInputDocument doc = new SolrInputDocument();
- GenericRecord record = message.getValue();
- List<Field> fields = record.getFields();
- for (Field field : fields) {
- Object fieldValue = record.getField(field);
- doc.setField(field.getName(), fieldValue);
- }
- return doc;
- }
-}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
deleted file mode 100644
index b761b12..0000000
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.base.Preconditions;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Configuration class for the Solr Sink Connector.
- */
-@Data
-@Accessors(chain = true)
-public class SolrSinkConfig implements Serializable {
-
- private static final long serialVersionUID = -4849066206354610110L;
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- help = "Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot)"
- + " or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr)"
- )
- private String solrUrl;
-
- @FieldDoc(
- required = true,
- defaultValue = "SolrCloud",
- help = "The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]")
- private String solrMode = "SolrCloud";
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- help = "Solr collection name to which records need to be written")
- private String solrCollection;
-
- @FieldDoc(
- required = false,
- defaultValue = "10",
- help = "Commit within milli seconds for solr update, if none passes defaults to 10 ms")
- private int solrCommitWithinMs = 10;
-
- @FieldDoc(
- required = false,
- defaultValue = "",
- sensitive = true,
- help = "The username to use for basic authentication")
- private String username;
-
- @FieldDoc(
- required = false,
- defaultValue = "",
- sensitive = true,
- help = "The password to use for basic authentication")
- private String password;
-
- public static SolrSinkConfig load(String yamlFile) throws IOException {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
- }
-
- public static SolrSinkConfig load(Map<String, Object> map) throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(new ObjectMapper().writeValueAsString(map), SolrSinkConfig.class);
- }
-
- public void validate() {
- Preconditions.checkNotNull(solrUrl, "solrUrl property not set.");
- Preconditions.checkNotNull(solrMode, "solrMode property not set.");
- Preconditions.checkNotNull(solrCollection, "solrCollection property not set.");
- Preconditions.checkArgument(solrCommitWithinMs > 0, "solrCommitWithinMs must be a positive integer.");
- }
-}
diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
deleted file mode 100644
index 26347f5..0000000
--- a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-name: solr
-description: Writes data into solr collection
-sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink
-sinkConfigClass: org.apache.pulsar.io.solr.SolrSinkConfig
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
deleted file mode 100644
index 1bd0d9d..0000000
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * solr Sink test
- */
-@Slf4j
-public class SolrGenericRecordSinkTest {
-
- private SolrServerUtil solrServerUtil;
- private Message<GenericRecord> message;
-
- /**
- * A Simple class to test solr class
- */
- @Data
- public static class Foo {
- private String field1;
- private String field2;
- }
-
- @BeforeMethod
- public void setUp() throws Exception {
- solrServerUtil = new SolrServerUtil(8983);
- solrServerUtil.startStandaloneSolr();
- }
-
- @AfterMethod
- public void tearDown() throws Exception {
- solrServerUtil.stopStandaloneSolr();
- }
-
- @Test
- public void TestOpenAndWriteSink() throws Exception {
- message = mock(MessageImpl.class);
- Map<String, Object> configs = new HashMap<>();
- configs.put("solrUrl", "http://localhost:8983/solr");
- configs.put("solrMode", "Standalone");
- configs.put("solrCollection", "techproducts");
- configs.put("solrCommitWithinMs", "100");
- configs.put("username", "");
- configs.put("password", "");
- GenericSchema<GenericRecord> genericAvroSchema;
-
- SolrGenericRecordSink sink = new SolrGenericRecordSink();
-
- // prepare a foo Record
- Foo obj = new Foo();
- obj.setField1("FakeFiled1");
- obj.setField2("FakeFiled1");
- AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-
- byte[] bytes = schema.encode(obj);
- AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
- autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
-
- Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
- .message(message)
- .topicName("fake_topic_name")
- .build();
-
- genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-
- when(message.getValue())
- .thenReturn(genericAvroSchema.decode(bytes));
-
- log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- obj.toString(),
- message.getValue().toString(),
- record.getValue().toString());
-
- // open should success
- sink.open(configs, null);
- }
-}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
deleted file mode 100644
index 033d0d2..0000000
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.FileUtils;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-
-import java.io.File;
-
-@Slf4j
-public class SolrServerUtil {
- private JettySolrRunner standaloneSolr;
- private int port;
-
- public SolrServerUtil(int port) {
- this.port = port;
- }
-
- public void startStandaloneSolr() throws Exception {
- if (standaloneSolr != null) {
- throw new IllegalStateException("Test is already running a standalone Solr instance " +
- standaloneSolr.getBaseUrl() + "! This indicates a bug in the unit test logic.");
- }
-
- File solrHomeDir = new File(FileUtils.getTempDirectory().getPath() + "/solr_home");
- String solrXml = "solr.xml";
- FileUtils.copyFile(getFile(solrXml), new File(solrHomeDir.getAbsolutePath() + "/" + solrXml));
- File solrLogDir = new File(solrHomeDir.getPath() + "/solr_logs");
-
- createTempDir(solrHomeDir);
- createTempDir(solrLogDir);
-
- System.setProperty("host", "localhost");
- System.setProperty("jetty.port", String.valueOf(port));
- System.setProperty("solr.log.dir", solrLogDir.getAbsolutePath());
-
- standaloneSolr = new JettySolrRunner(solrHomeDir.getAbsolutePath(), "/solr", port);
- Thread bg = new Thread() {
- public void run() {
- try {
- standaloneSolr.start();
- } catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException)e;
- } else {
- throw new RuntimeException(e);
- }
- }
- }
- };
- bg.start();
- }
-
- public void stopStandaloneSolr() {
- if (standaloneSolr != null) {
- try {
- standaloneSolr.stop();
- } catch (Exception e) {
- log.error("Failed to stop standalone solr.");
- }
- }
- }
-
- private File getFile(String name) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(name).getFile());
- }
-
- private void createTempDir(File file) {
- if (!file.exists() && !file.isDirectory()) {
- file.mkdirs();
- }
- }
-}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
deleted file mode 100644
index 493542f..0000000
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import com.google.common.collect.Lists;
-import org.testng.annotations.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-/**
- * SolrSinkConfig test
- */
-public class SolrSinkConfigTest {
-
- @Test
- public final void loadFromYamlFileTest() throws IOException {
- File yamlFile = getFile("sinkConfig.yaml");
- String path = yamlFile.getAbsolutePath();
- SolrSinkConfig config = SolrSinkConfig.load(path);
- assertNotNull(config);
- assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
- assertEquals("SolrCloud", config.getSolrMode());
- assertEquals("techproducts", config.getSolrCollection());
- assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
- assertEquals("fakeuser", config.getUsername());
- assertEquals("fake@123", config.getPassword());
- }
-
- @Test
- public final void loadFromMapTest() throws IOException {
- Map<String, Object> map = new HashMap<>();
- map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
- map.put("solrMode", "SolrCloud");
- map.put("solrCollection", "techproducts");
- map.put("solrCommitWithinMs", "100");
- map.put("username", "fakeuser");
- map.put("password", "fake@123");
-
- SolrSinkConfig config = SolrSinkConfig.load(map);
- assertNotNull(config);
- assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
- assertEquals("SolrCloud", config.getSolrMode());
- assertEquals("techproducts", config.getSolrCollection());
- assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
- assertEquals("fakeuser", config.getUsername());
- assertEquals("fake@123", config.getPassword());
- }
-
- @Test
- public final void validValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<>();
- map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
- map.put("solrMode", "SolrCloud");
- map.put("solrCollection", "techproducts");
- map.put("solrCommitWithinMs", "100");
- map.put("username", "fakeuser");
- map.put("password", "fake@123");
-
- SolrSinkConfig config = SolrSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "solrUrl property not set.")
- public final void missingValidValidateSolrModeTest() throws IOException {
- Map<String, Object> map = new HashMap<>();
- map.put("solrMode", "SolrCloud");
- map.put("solrCollection", "techproducts");
- map.put("solrCommitWithinMs", "100");
- map.put("username", "fakeuser");
- map.put("password", "fake@123");
-
- SolrSinkConfig config = SolrSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "solrCommitWithinMs must be a positive integer.")
- public final void invalidBatchTimeMsTest() throws IOException {
- Map<String, Object> map = new HashMap<>();
- map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
- map.put("solrMode", "SolrCloud");
- map.put("solrCollection", "techproducts");
- map.put("solrCommitWithinMs", "-100");
- map.put("username", "fakeuser");
- map.put("password", "fake@123");
-
- SolrSinkConfig config = SolrSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.solr.SolrAbstractSink.SolrMode.NOTSUPPORT")
- public final void invalidClientModeTest() throws IOException {
- Map<String, Object> map = new HashMap<>();
- map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
- map.put("solrMode", "NotSupport");
- map.put("solrCollection", "techproducts");
- map.put("solrCommitWithinMs", "100");
- map.put("username", "fakeuser");
- map.put("password", "fake@123");
-
- SolrSinkConfig config = SolrSinkConfig.load(map);
- config.validate();
-
- SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase());
- }
-
- @Test
- public final void validZkChrootTest() throws IOException {
- Map<String, Object> map = new HashMap<>();
- map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
- map.put("solrMode", "SolrCloud");
- map.put("solrCollection", "techproducts");
- map.put("solrCommitWithinMs", "100");
- map.put("username", "fakeuser");
- map.put("password", "fake@123");
-
- SolrSinkConfig config = SolrSinkConfig.load(map);
- config.validate();
-
- String url = config.getSolrUrl();
- int chrootIndex = url.indexOf("/");
- Optional<String> chroot = Optional.empty();
- if (chrootIndex > 0) {
- chroot = Optional.of(url.substring(chrootIndex));
- }
- String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
- List<String> zkHosts = Arrays.asList(zkUrls.split(","));
-
- List<String> expectedZkHosts = Lists.newArrayList();
- expectedZkHosts.add("localhost:2181");
- expectedZkHosts.add("localhost:2182");
-
- assertEquals("/chroot", chroot.get());
- assertEquals(expectedZkHosts, zkHosts);
- }
-
- private File getFile(String name) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(name).getFile());
- }
-}
diff --git a/pulsar-io/solr/src/test/resources/sinkConfig.yaml b/pulsar-io/solr/src/test/resources/sinkConfig.yaml
deleted file mode 100644
index d96b353..0000000
--- a/pulsar-io/solr/src/test/resources/sinkConfig.yaml
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-"solrUrl": "localhost:2181,localhost:2182/chroot",
-"solrMode": "SolrCloud",
-"solrCollection": "techproducts",
-"solrCommitWithinMs": "100",
-"username": "fakeuser",
-"password": "fake@123"
-}
diff --git a/pulsar-io/solr/src/test/resources/solr.xml b/pulsar-io/solr/src/test/resources/solr.xml
deleted file mode 100644
index 495a71f..0000000
--- a/pulsar-io/solr/src/test/resources/solr.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<solr>
- <solrcloud>
- <str name="host">${host:}</str>
- <int name="hostPort">${jetty.port:8983}</int>
- <str name="hostContext">${hostContext:solr}</str>
- <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
- <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
- <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:600000}</int>
- <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
- <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
- <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
- </solrcloud>
- <shardHandlerFactory name="shardHandlerFactory"
- class="HttpShardHandlerFactory">
- <int name="socketTimeout">${socketTimeout:600000}</int>
- <int name="connTimeout">${connTimeout:60000}</int>
- </shardHandlerFactory>
-</solr>
\ No newline at end of file