You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/05 17:49:29 UTC

[incubator-pulsar] branch master updated: Misc Cleanups (#2512)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b2bf19  Misc Cleanups (#2512)
7b2bf19 is described below

commit 7b2bf197ac5b17e4fcfdb9d6aa2a7fadbe713d37
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Sep 5 10:49:26 2018 -0700

    Misc Cleanups (#2512)
---
 .../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 10 +---------
 pulsar-functions/proto/src/main/proto/Function.proto           |  2 +-
 2 files changed, 2 insertions(+), 10 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 344d9ee..4ba7340 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -210,14 +210,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 stats.incrementProcessed(processAt);
                 addLogTopicHandler();
                 JavaExecutionResult result;
-                MessageId messageId = null;
-                String topicName = null;
-
-                if (currentRecord instanceof PulsarRecord) {
-                    PulsarRecord<?> pulsarRecord = (PulsarRecord<?>) currentRecord;
-                    messageId = pulsarRecord.getMessageId();
-                    topicName = pulsarRecord.getTopicName().get();
-                }
 
                 result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
 
@@ -511,7 +503,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         // If source classname is not set, we default pulsar source
         if (sourceSpec.getClassName().isEmpty()) {
             PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
-            sourceSpec.getInputSpecs().forEach((topic, conf) -> {
+            sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
                 ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
                 if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
                     consumerConfig.setSchemaType(conf.getSchemaType());
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index eff5d1d..6e969ae 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -85,7 +85,7 @@ message SourceSpec {
     map<string, ConsumerSpec> inputSpecs = 10;
 
     uint64 timeoutMs = 6;
-    string topicsPattern = 7;
+    string topicsPattern = 7 [deprecated = true];
 
     /* If specified, this will refer to an archive that is
      * already present in the server */