You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/14 12:18:08 UTC

[pulsar] 04/14: Remove Solr

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_rootless
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 53120ffa6799fc4b9883572b8f8d286856db0244
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu May 13 14:26:10 2021 +0200

    Remove Solr
---
 pulsar-io/pom.xml                                  |   1 -
 pulsar-io/solr/pom.xml                             |  86 -----------
 .../apache/pulsar/io/solr/SolrAbstractSink.java    | 131 ----------------
 .../pulsar/io/solr/SolrGenericRecordSink.java      |  53 -------
 .../org/apache/pulsar/io/solr/SolrSinkConfig.java  |  98 ------------
 .../resources/META-INF/services/pulsar-io.yaml     |  23 ---
 .../pulsar/io/solr/SolrGenericRecordSinkTest.java  | 114 --------------
 .../org/apache/pulsar/io/solr/SolrServerUtil.java  |  91 -----------
 .../apache/pulsar/io/solr/SolrSinkConfigTest.java  | 168 ---------------------
 pulsar-io/solr/src/test/resources/sinkConfig.yaml  |  27 ----
 pulsar-io/solr/src/test/resources/solr.xml         |  38 -----
 11 files changed, 830 deletions(-)

diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 40b476a..a3911c0 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -64,7 +64,6 @@
         <module>mongo</module>
         <module>flume</module>
         <module>redis</module>
-        <module>solr</module>
         <module>influxdb</module>
         <module>dynamodb</module>
         <module>nsq</module>
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
deleted file mode 100644
index a8d6103..0000000
--- a/pulsar-io/solr/pom.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>pulsar-io</artifactId>
-        <groupId>org.apache.pulsar</groupId>
-        <version>2.7.2.1.0.0</version>
-    </parent>
-
-    <properties>
-        <solr.version>8.6.3</solr.version>
-    </properties>
-
-    <artifactId>pulsar-io-solr</artifactId>
-    <name>Pulsar IO :: Solr</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>${project.parent.groupId}</groupId>
-            <artifactId>pulsar-io-core</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>pulsar-functions-instance</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>pulsar-client-original</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.solr</groupId>
-            <artifactId>solr-solrj</artifactId>
-            <version>${solr.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.solr</groupId>
-            <artifactId>solr-core</artifactId>
-            <version>${solr.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <version>3.4</version>
-        </dependency>
-        <dependency>
-            <groupId>commons-collections</groupId>
-            <artifactId>commons-collections</artifactId>
-            <version>3.2.2</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.nifi</groupId>
-                <artifactId>nifi-nar-maven-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
\ No newline at end of file
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
deleted file mode 100644
index d7bcb12..0000000
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import com.google.common.base.Strings;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrInputDocument;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A simple abstract class for Solr sink
- */
-@Slf4j
-public abstract class SolrAbstractSink<T> implements Sink<T> {
-
-    private SolrSinkConfig solrSinkConfig;
-    private SolrClient client;
-    private boolean enableBasicAuth;
-
-    @Override
-    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        solrSinkConfig = SolrSinkConfig.load(config);
-        solrSinkConfig.validate();
-
-        enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername());
-
-        SolrMode solrMode;
-        try {
-            solrMode = SolrMode.valueOf(solrSinkConfig.getSolrMode().toUpperCase());
-        } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException("Illegal Solr mode, valid values are: "
-                + Arrays.asList(SolrMode.values()));
-        }
-
-        client = getClient(solrMode, solrSinkConfig.getSolrUrl());
-    }
-
-    @Override
-    public void write(Record<T> record) {
-        UpdateRequest updateRequest = new UpdateRequest();
-        if (solrSinkConfig.getSolrCommitWithinMs() > 0) {
-            updateRequest.setCommitWithin(solrSinkConfig.getSolrCommitWithinMs());
-        }
-        if (enableBasicAuth) {
-            updateRequest.setBasicAuthCredentials(
-                solrSinkConfig.getUsername(),
-                solrSinkConfig.getPassword()
-            );
-        }
-
-        SolrInputDocument document = convert(record);
-        updateRequest.add(document);
-
-        try {
-            UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection());
-            if (updateResponse.getStatus() == 0) {
-                record.ack();
-            } else {
-                record.fail();
-            }
-        } catch (SolrServerException | IOException e) {
-            record.fail();
-            log.warn("Solr update document exception ", e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (client != null) {
-            client.close();
-        }
-    }
-
-    // convert record as a Solr document
-    public abstract SolrInputDocument convert(Record<T> message);
-
-    public static SolrClient getClient(SolrMode solrMode, String url) {
-        SolrClient solrClient = null;
-        if (solrMode.equals(SolrMode.STANDALONE)) {
-            HttpSolrClient.Builder builder = new HttpSolrClient.Builder(url);
-            solrClient = builder.build();
-        }
-        if (solrMode.equals(SolrMode.SOLRCLOUD)) {
-            int chrootIndex = url.indexOf("/");
-            Optional<String> chroot = Optional.empty();
-            if (chrootIndex > 0) {
-                chroot = Optional.of(url.substring(chrootIndex));
-            }
-            String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
-            List<String> zkHosts = Arrays.asList(zkUrls.split(","));
-            CloudSolrClient.Builder builder = new CloudSolrClient.Builder(zkHosts, chroot);
-            solrClient = builder.build();
-        }
-        return solrClient;
-    }
-
-    public enum SolrMode {
-        STANDALONE,
-        SOLRCLOUD
-    }
-}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
deleted file mode 100644
index e1df871..0000000
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.solr.common.SolrInputDocument;
-
-import java.util.List;
-
-/**
- * A simple Solr sink, which interprets input Record in generic record.
- */
-@Connector(
-    name = "solr",
-    type = IOType.SINK,
-    help = "The SolrGenericRecordSink is used for moving messages from Pulsar to Solr.",
-    configClass = SolrSinkConfig.class
-)
-@Slf4j
-public class SolrGenericRecordSink extends SolrAbstractSink<GenericRecord> {
-    @Override
-    public SolrInputDocument convert(Record<GenericRecord> message) {
-        SolrInputDocument doc = new SolrInputDocument();
-        GenericRecord record = message.getValue();
-        List<Field> fields = record.getFields();
-        for (Field field : fields) {
-            Object fieldValue = record.getField(field);
-            doc.setField(field.getName(), fieldValue);
-        }
-        return doc;
-    }
-}
diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
deleted file mode 100644
index b761b12..0000000
--- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.base.Preconditions;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Configuration class for the Solr Sink Connector.
- */
-@Data
-@Accessors(chain = true)
-public class SolrSinkConfig implements Serializable {
-
-    private static final long serialVersionUID = -4849066206354610110L;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help = "Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot)"
-            + " or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr)"
-    )
-    private String solrUrl;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "SolrCloud",
-        help = "The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]")
-    private String solrMode = "SolrCloud";
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help = "Solr collection name to which records need to be written")
-    private String solrCollection;
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "10",
-        help = "Commit within milli seconds for solr update, if none passes defaults to 10 ms")
-    private int solrCommitWithinMs = 10;
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "",
-        sensitive = true,
-        help = "The username to use for basic authentication")
-    private String username;
-
-    @FieldDoc(
-        required = false,
-        defaultValue = "",
-        sensitive = true,
-        help = "The password to use for basic authentication")
-    private String password;
-
-    public static SolrSinkConfig load(String yamlFile) throws IOException {
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
-    }
-
-    public static SolrSinkConfig load(Map<String, Object> map) throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), SolrSinkConfig.class);
-    }
-
-    public void validate() {
-        Preconditions.checkNotNull(solrUrl, "solrUrl property not set.");
-        Preconditions.checkNotNull(solrMode, "solrMode property not set.");
-        Preconditions.checkNotNull(solrCollection, "solrCollection property not set.");
-        Preconditions.checkArgument(solrCommitWithinMs > 0, "solrCommitWithinMs must be a positive integer.");
-    }
-}
diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
deleted file mode 100644
index 26347f5..0000000
--- a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-name: solr
-description: Writes data into solr collection
-sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink
-sinkConfigClass: org.apache.pulsar.io.solr.SolrSinkConfig
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
deleted file mode 100644
index 1bd0d9d..0000000
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * solr Sink test
- */
-@Slf4j
-public class SolrGenericRecordSinkTest {
-
-    private SolrServerUtil solrServerUtil;
-    private Message<GenericRecord> message;
-
-    /**
-     * A Simple class to test solr class
-     */
-    @Data
-    public static class Foo {
-        private String field1;
-        private String field2;
-    }
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        solrServerUtil = new SolrServerUtil(8983);
-        solrServerUtil.startStandaloneSolr();
-    }
-
-    @AfterMethod
-    public void tearDown() throws Exception {
-        solrServerUtil.stopStandaloneSolr();
-    }
-
-    @Test
-    public void TestOpenAndWriteSink() throws Exception {
-        message = mock(MessageImpl.class);
-        Map<String, Object> configs = new HashMap<>();
-        configs.put("solrUrl", "http://localhost:8983/solr");
-        configs.put("solrMode", "Standalone");
-        configs.put("solrCollection", "techproducts");
-        configs.put("solrCommitWithinMs", "100");
-        configs.put("username", "");
-        configs.put("password", "");
-        GenericSchema<GenericRecord> genericAvroSchema;
-
-        SolrGenericRecordSink sink = new SolrGenericRecordSink();
-
-        // prepare a foo Record
-        Foo obj = new Foo();
-        obj.setField1("FakeFiled1");
-        obj.setField2("FakeFiled1");
-        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-
-        byte[] bytes = schema.encode(obj);
-        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
-        autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
-
-        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
-            .message(message)
-            .topicName("fake_topic_name")
-            .build();
-
-        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
-
-        when(message.getValue())
-                .thenReturn(genericAvroSchema.decode(bytes));
-
-        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-            obj.toString(),
-            message.getValue().toString(),
-            record.getValue().toString());
-
-        // open should success
-        sink.open(configs, null);
-    }
-}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
deleted file mode 100644
index 033d0d2..0000000
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.FileUtils;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-
-import java.io.File;
-
-@Slf4j
-public class SolrServerUtil {
-    private JettySolrRunner standaloneSolr;
-    private int port;
-
-    public SolrServerUtil(int port) {
-        this.port = port;
-    }
-
-    public void startStandaloneSolr() throws Exception {
-        if (standaloneSolr != null) {
-            throw new IllegalStateException("Test is already running a standalone Solr instance " +
-                standaloneSolr.getBaseUrl() + "! This indicates a bug in the unit test logic.");
-        }
-
-        File solrHomeDir = new File(FileUtils.getTempDirectory().getPath() + "/solr_home");
-        String solrXml = "solr.xml";
-        FileUtils.copyFile(getFile(solrXml), new File(solrHomeDir.getAbsolutePath() + "/" + solrXml));
-        File solrLogDir = new File(solrHomeDir.getPath() + "/solr_logs");
-
-        createTempDir(solrHomeDir);
-        createTempDir(solrLogDir);
-
-        System.setProperty("host", "localhost");
-        System.setProperty("jetty.port", String.valueOf(port));
-        System.setProperty("solr.log.dir", solrLogDir.getAbsolutePath());
-
-        standaloneSolr = new JettySolrRunner(solrHomeDir.getAbsolutePath(), "/solr", port);
-        Thread bg = new Thread() {
-            public void run() {
-                try {
-                    standaloneSolr.start();
-                } catch (Exception e) {
-                    if (e instanceof RuntimeException) {
-                        throw (RuntimeException)e;
-                    } else {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }
-        };
-        bg.start();
-    }
-
-    public void stopStandaloneSolr() {
-        if (standaloneSolr != null) {
-            try {
-                standaloneSolr.stop();
-            } catch (Exception e) {
-                log.error("Failed to stop standalone solr.");
-            }
-        }
-    }
-
-    private File getFile(String name) {
-        ClassLoader classLoader = getClass().getClassLoader();
-        return new File(classLoader.getResource(name).getFile());
-    }
-
-    private void createTempDir(File file) {
-        if (!file.exists() && !file.isDirectory()) {
-            file.mkdirs();
-        }
-    }
-}
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
deleted file mode 100644
index 493542f..0000000
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.solr;
-
-import com.google.common.collect.Lists;
-import org.testng.annotations.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-/**
- * SolrSinkConfig test
- */
-public class SolrSinkConfigTest {
-
-    @Test
-    public final void loadFromYamlFileTest() throws IOException {
-        File yamlFile = getFile("sinkConfig.yaml");
-        String path = yamlFile.getAbsolutePath();
-        SolrSinkConfig config = SolrSinkConfig.load(path);
-        assertNotNull(config);
-        assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
-        assertEquals("SolrCloud", config.getSolrMode());
-        assertEquals("techproducts", config.getSolrCollection());
-        assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
-        assertEquals("fakeuser", config.getUsername());
-        assertEquals("fake@123", config.getPassword());
-    }
-
-    @Test
-    public final void loadFromMapTest() throws IOException {
-        Map<String, Object> map = new HashMap<>();
-        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
-        map.put("solrMode", "SolrCloud");
-        map.put("solrCollection", "techproducts");
-        map.put("solrCommitWithinMs", "100");
-        map.put("username", "fakeuser");
-        map.put("password", "fake@123");
-
-        SolrSinkConfig config = SolrSinkConfig.load(map);
-        assertNotNull(config);
-        assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl());
-        assertEquals("SolrCloud", config.getSolrMode());
-        assertEquals("techproducts", config.getSolrCollection());
-        assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs());
-        assertEquals("fakeuser", config.getUsername());
-        assertEquals("fake@123", config.getPassword());
-    }
-
-    @Test
-    public final void validValidateTest() throws IOException {
-        Map<String, Object> map = new HashMap<>();
-        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
-        map.put("solrMode", "SolrCloud");
-        map.put("solrCollection", "techproducts");
-        map.put("solrCommitWithinMs", "100");
-        map.put("username", "fakeuser");
-        map.put("password", "fake@123");
-
-        SolrSinkConfig config = SolrSinkConfig.load(map);
-        config.validate();
-    }
-
-    @Test(expectedExceptions = NullPointerException.class,
-        expectedExceptionsMessageRegExp = "solrUrl property not set.")
-    public final void missingValidValidateSolrModeTest() throws IOException {
-        Map<String, Object> map = new HashMap<>();
-        map.put("solrMode", "SolrCloud");
-        map.put("solrCollection", "techproducts");
-        map.put("solrCommitWithinMs", "100");
-        map.put("username", "fakeuser");
-        map.put("password", "fake@123");
-
-        SolrSinkConfig config = SolrSinkConfig.load(map);
-        config.validate();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class,
-        expectedExceptionsMessageRegExp = "solrCommitWithinMs must be a positive integer.")
-    public final void invalidBatchTimeMsTest() throws IOException {
-        Map<String, Object> map = new HashMap<>();
-        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
-        map.put("solrMode", "SolrCloud");
-        map.put("solrCollection", "techproducts");
-        map.put("solrCommitWithinMs", "-100");
-        map.put("username", "fakeuser");
-        map.put("password", "fake@123");
-
-        SolrSinkConfig config = SolrSinkConfig.load(map);
-        config.validate();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class,
-        expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.solr.SolrAbstractSink.SolrMode.NOTSUPPORT")
-    public final void invalidClientModeTest() throws IOException {
-        Map<String, Object> map = new HashMap<>();
-        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
-        map.put("solrMode", "NotSupport");
-        map.put("solrCollection", "techproducts");
-        map.put("solrCommitWithinMs", "100");
-        map.put("username", "fakeuser");
-        map.put("password", "fake@123");
-
-        SolrSinkConfig config = SolrSinkConfig.load(map);
-        config.validate();
-
-        SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase());
-    }
-
-    @Test
-    public final void validZkChrootTest() throws IOException {
-        Map<String, Object> map = new HashMap<>();
-        map.put("solrUrl", "localhost:2181,localhost:2182/chroot");
-        map.put("solrMode", "SolrCloud");
-        map.put("solrCollection", "techproducts");
-        map.put("solrCommitWithinMs", "100");
-        map.put("username", "fakeuser");
-        map.put("password", "fake@123");
-
-        SolrSinkConfig config = SolrSinkConfig.load(map);
-        config.validate();
-
-        String url = config.getSolrUrl();
-        int chrootIndex = url.indexOf("/");
-        Optional<String> chroot = Optional.empty();
-        if (chrootIndex > 0) {
-            chroot = Optional.of(url.substring(chrootIndex));
-        }
-        String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url;
-        List<String> zkHosts = Arrays.asList(zkUrls.split(","));
-
-        List<String> expectedZkHosts = Lists.newArrayList();
-        expectedZkHosts.add("localhost:2181");
-        expectedZkHosts.add("localhost:2182");
-
-        assertEquals("/chroot", chroot.get());
-        assertEquals(expectedZkHosts, zkHosts);
-    }
-
-    private File getFile(String name) {
-        ClassLoader classLoader = getClass().getClassLoader();
-        return new File(classLoader.getResource(name).getFile());
-    }
-}
diff --git a/pulsar-io/solr/src/test/resources/sinkConfig.yaml b/pulsar-io/solr/src/test/resources/sinkConfig.yaml
deleted file mode 100644
index d96b353..0000000
--- a/pulsar-io/solr/src/test/resources/sinkConfig.yaml
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-"solrUrl": "localhost:2181,localhost:2182/chroot",
-"solrMode": "SolrCloud",
-"solrCollection": "techproducts",
-"solrCommitWithinMs": "100",
-"username": "fakeuser",
-"password": "fake@123"
-}
diff --git a/pulsar-io/solr/src/test/resources/solr.xml b/pulsar-io/solr/src/test/resources/solr.xml
deleted file mode 100644
index 495a71f..0000000
--- a/pulsar-io/solr/src/test/resources/solr.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<solr>
-    <solrcloud>
-        <str name="host">${host:}</str>
-        <int name="hostPort">${jetty.port:8983}</int>
-        <str name="hostContext">${hostContext:solr}</str>
-        <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
-        <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
-        <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:600000}</int>
-        <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
-        <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
-        <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
-    </solrcloud>
-    <shardHandlerFactory name="shardHandlerFactory"
-                         class="HttpShardHandlerFactory">
-        <int name="socketTimeout">${socketTimeout:600000}</int>
-        <int name="connTimeout">${connTimeout:60000}</int>
-    </shardHandlerFactory>
-</solr>
\ No newline at end of file