You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/05 17:49:29 UTC
[incubator-pulsar] branch master updated: Misc Cleanups (#2512)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b2bf19 Misc Cleanups (#2512)
7b2bf19 is described below
commit 7b2bf197ac5b17e4fcfdb9d6aa2a7fadbe713d37
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Sep 5 10:49:26 2018 -0700
Misc Cleanups (#2512)
---
.../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 10 +---------
pulsar-functions/proto/src/main/proto/Function.proto | 2 +-
2 files changed, 2 insertions(+), 10 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 344d9ee..4ba7340 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -210,14 +210,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
stats.incrementProcessed(processAt);
addLogTopicHandler();
JavaExecutionResult result;
- MessageId messageId = null;
- String topicName = null;
-
- if (currentRecord instanceof PulsarRecord) {
- PulsarRecord<?> pulsarRecord = (PulsarRecord<?>) currentRecord;
- messageId = pulsarRecord.getMessageId();
- topicName = pulsarRecord.getTopicName().get();
- }
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
@@ -511,7 +503,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// If source classname is not set, we default pulsar source
if (sourceSpec.getClassName().isEmpty()) {
PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
- sourceSpec.getInputSpecs().forEach((topic, conf) -> {
+ sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
consumerConfig.setSchemaType(conf.getSchemaType());
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index eff5d1d..6e969ae 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -85,7 +85,7 @@ message SourceSpec {
map<string, ConsumerSpec> inputSpecs = 10;
uint64 timeoutMs = 6;
- string topicsPattern = 7;
+ string topicsPattern = 7 [deprecated = true];
/* If specified, this will refer to an archive that is
* already present in the server */