You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/07 14:51:14 UTC

[GitHub] david-streamlio closed pull request #2539: Elastic connector

david-streamlio closed pull request #2539: Elastic connector
URL: https://github.com/apache/incubator-pulsar/pull/2539
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs/pom.xml
index 0d552077d5..dde6e693db 100644
--- a/pulsar-io/hdfs/pom.xml
+++ b/pulsar-io/hdfs/pom.xml
@@ -26,6 +26,7 @@
     <version>2.2.0-incubating-SNAPSHOT</version>
   </parent>
   <artifactId>pulsar-io-hdfs</artifactId>
+  <name>Pulsar IO :: Hdfs</name>
   
   <dependencies>
      <dependency>
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 5c033708ba..c6baed4795 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -42,6 +42,7 @@
     <module>hdfs</module>
     <module>jdbc</module>
     <module>data-genenator</module>
+    <module>hdfs</module>
   </modules>
 
 </project>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 17634a1ab8..18bfc09bc4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -42,7 +42,14 @@
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.HdfsSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
+import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.io.SinkTester;
+import org.apache.pulsar.tests.integration.io.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.Assert;
@@ -60,17 +67,17 @@
 
     @Test
     public void testKafkaSink() throws Exception {
-        testSink(new KafkaSinkTester(), true);
+        testSink(new KafkaSinkTester());
     }
 
     @Test
     public void testCassandraSink() throws Exception {
-        testSink(new CassandraSinkTester(), true);
+        testSink(new CassandraSinkTester());
     }
-
-    @Test
-    public void testCassandraArchiveSink() throws Exception {
-        testSink(new CassandraSinkArchiveTester(), false);
+    
+    @Test(enabled = false)
+    public void testHdfsSink() throws Exception {
+    	testSink(new HdfsSinkTester());
     }
     
     @Test(enabled = false)
@@ -80,10 +87,10 @@ public void testHdfsSink() throws Exception {
     
     @Test
     public void testJdbcSink() throws Exception {
-        testSink(new JdbcSinkTester(), true);
+        testSink(new JdbcSinkTester());
     }
 
-    private void testSink(SinkTester tester, boolean builtin) throws Exception {
+    private void testSink(SinkTester tester) throws Exception {
         tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
 
         final String tenant = TopicName.PUBLIC_TENANT;
@@ -91,7 +98,7 @@ private void testSink(SinkTester tester, boolean builtin) throws Exception {
         final String inputTopicName = "test-sink-connector-"
             + tester.getSinkType() + "-" + functionRuntimeType + "-input-topic-" + randomName(8);
         final String sinkName = "test-sink-connector-"
-            + tester.getSinkType().name().toLowerCase() + "-" + functionRuntimeType + "-name-" + randomName(8);
+            + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + randomName(8);
         final int numMessages = 20;
 
         // prepare the testing environment for sink
@@ -101,7 +108,7 @@ private void testSink(SinkTester tester, boolean builtin) throws Exception {
         submitSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
 
         // get sink info
-        getSinkInfoSuccess(tester, tenant, namespace, sinkName, builtin);
+        getSinkInfoSuccess(tester, tenant, namespace, sinkName);
 
         // get sink status
         getSinkStatus(tenant, namespace, sinkName);
@@ -109,7 +116,7 @@ private void testSink(SinkTester tester, boolean builtin) throws Exception {
         // produce messages
         Map<String, String> kvs;
         if (tester instanceof JdbcSinkTester) {
-            kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+            kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(Foo.class));
         } else {
             kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
         }
@@ -136,31 +143,16 @@ protected void submitSinkConnector(SinkTester tester,
                                        String namespace,
                                        String sinkName,
                                        String inputTopicName) throws Exception {
-        String[] commands;
-        if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
-            commands = new String[] {
-                    PulsarCluster.ADMIN_SCRIPT,
-                    "sink", "create",
-                    "--tenant", tenant,
-                    "--namespace", namespace,
-                    "--name", sinkName,
-                    "--sink-type", tester.sinkType().name().toLowerCase(),
-                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-                    "--inputs", inputTopicName
-            };
-        } else {
-            commands = new String[] {
-                    PulsarCluster.ADMIN_SCRIPT,
-                    "sink", "create",
-                    "--tenant", tenant,
-                    "--namespace", namespace,
-                    "--name", sinkName,
-                    "--archive", tester.getSinkArchive(),
-                    "--classname", tester.getSinkClassName(),
-                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-                    "--inputs", inputTopicName
-            };
-        }
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "sink", "create",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName,
+            "--sink-type", tester.sinkType().name(),
+            "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+            "--inputs", inputTopicName
+        };
         log.info("Run command : {}", StringUtils.join(commands, ' '));
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
@@ -171,8 +163,7 @@ protected void submitSinkConnector(SinkTester tester,
     protected void getSinkInfoSuccess(SinkTester tester,
                                       String tenant,
                                       String namespace,
-                                      String sinkName,
-                                      boolean builtin) throws Exception {
+                                      String sinkName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
@@ -183,17 +174,10 @@ protected void getSinkInfoSuccess(SinkTester tester,
         };
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
-        if (builtin) {
-            assertTrue(
-                    result.getStdout().contains("\"builtin\": \"" + tester.getSinkType().name().toLowerCase() + "\""),
-                    result.getStdout()
-            );
-        } else {
-            assertTrue(
-                    result.getStdout().contains("\"className\": \"" + tester.getSinkClassName() + "\""),
-                    result.getStdout()
-            );
-        }
+        assertTrue(
+            result.getStdout().contains("\"builtin\": \"" + tester.getSinkType() + "\""),
+            result.getStdout()
+        );
     }
 
     protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
@@ -258,7 +242,7 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
 
-            JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
+            Foo obj = new Foo();
             obj.setField1("field1_" + i);
             obj.setField2("field2_" + i);
             obj.setField3(i);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services