You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/08/24 17:37:31 UTC
[pulsar] branch master updated: Add ability for BatchPushSource to
notify errors asynchronously (#7865)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0724482 Add ability for BatchPushSource to notify errors asynchronously (#7865)
0724482 is described below
commit 0724482fa9640babe425c761a0765eea7b99d7b5
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Aug 24 10:37:04 2020 -0700
Add ability for BatchPushSource to notify errors asynchronously (#7865)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../BatchDataGeneratorPushSource.java | 81 ++++++++++++++++++++++
.../org/apache/pulsar/io/core/BatchPushSource.java | 26 +++++++
.../apache/pulsar/io/core/BatchPushSourceTest.java | 57 +++++++++++++++
3 files changed, 164 insertions(+)
diff --git a/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
new file mode 100644
index 0000000..c1c2920
--- /dev/null
+++ b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.batchdatagenerator;
+
+import io.codearte.jfairy.Fairy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.BatchPushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+@Slf4j
+public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implements Runnable {
+
+ private Fairy fairy;
+ private SourceContext sourceContext;
+ private int maxRecordsPerCycle = 10;
+
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void close() {
+ executor.shutdownNow();
+ }
+
+ @Override
+ public void open(Map config, SourceContext context) throws Exception {
+ this.fairy = Fairy.create();
+ this.sourceContext = context;
+ }
+
+ @Override
+ public void discover(Consumer taskEater) throws Exception {
+ log.info("Generating one task for each instance");
+ for (int i = 0; i < sourceContext.getNumInstances(); ++i) {
+ taskEater.accept(String.format("something-%d", System.currentTimeMillis()).getBytes());
+ }
+ }
+
+ @Override
+ public void prepare(byte[] instanceSplit) throws Exception {
+ log.info("Instance " + sourceContext.getInstanceId() + " got a new discovered task {}", new String(instanceSplit));
+ executor.submit(this);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < maxRecordsPerCycle; i++) {
+ Thread.sleep(50);
+ Record<Person> record = () -> new Person(fairy.person());
+ consume(record);
+ }
+ // this task is completed
+ consume(null);
+ } catch (Exception e) {
+ notifyError(e);
+ }
+ }
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
index 9f44fd5..6cb5a49 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
@@ -37,6 +37,21 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
}
}
+ private static class ErrorNotifierRecord implements Record {
+ private Exception e;
+ public ErrorNotifierRecord(Exception e) {
+ this.e = e;
+ }
+ @Override
+ public Object getValue() {
+ return null;
+ }
+
+ public Exception getException() {
+ return e;
+ }
+ }
+
private LinkedBlockingQueue<Record<T>> queue;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
private final NullRecord nullRecord = new NullRecord();
@@ -48,6 +63,9 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
@Override
public Record<T> readNext() throws Exception {
Record<T> record = queue.take();
+ if (record instanceof ErrorNotifierRecord) {
+ throw ((ErrorNotifierRecord) record).getException();
+ }
if (record instanceof NullRecord) {
return null;
} else {
@@ -80,4 +98,12 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
public int getQueueLength() {
return DEFAULT_QUEUE_LENGTH;
}
+
+ /**
+ * Allows the source to notify errors asynchronously
+ * @param ex
+ */
+ public void notifyError(Exception ex) {
+ consume(new ErrorNotifierRecord(ex));
+ }
}
\ No newline at end of file
diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/BatchPushSourceTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/BatchPushSourceTest.java
new file mode 100644
index 0000000..3ac2b49
--- /dev/null
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/BatchPushSourceTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import org.apache.pulsar.io.core.BatchPushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+public class BatchPushSourceTest {
+
+ BatchPushSource testBatchSource = new BatchPushSource() {
+ @Override
+ public void open(Map config, SourceContext context) throws Exception {
+
+ }
+
+ @Override
+ public void discover(Consumer taskEater) throws Exception {
+
+ }
+
+ @Override
+ public void prepare(byte[] task) throws Exception {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ };
+
+ @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
+ public void testNotifyErrors() throws Exception {
+ testBatchSource.notifyError(new RuntimeException("test exception"));
+ testBatchSource.readNext();
+ }
+}