You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2020/07/16 15:57:27 UTC
[fluo] branch master updated: Fixes #795 added submit method that
returns future to loader executor (#1100)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 737ad72 Fixes #795 added submit method that returns future to loader executor (#1100)
737ad72 is described below
commit 737ad72ec7a6594dca31837fbc9bc803978fc72c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jul 16 11:57:18 2020 -0400
Fixes #795 added submit method that returns future to loader executor (#1100)
---
.../org/apache/fluo/api/client/LoaderExecutor.java | 37 ++++++++++++-
.../fluo/core/client/LoaderExecutorAsyncImpl.java | 54 +++++++++++++++++-
.../fluo/integration/client/LoaderExecutorIT.java | 64 +++++++++++++++++++++-
3 files changed, 148 insertions(+), 7 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
index 6868d92..b69fffb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -15,6 +15,8 @@
package org.apache.fluo.api.client;
+import java.util.concurrent.CompletableFuture;
+
/**
* Executes provided {@link Loader} objects to load data into Fluo. {@link LoaderExecutor#close()}
* should be called when finished.
@@ -26,6 +28,11 @@ public interface LoaderExecutor extends AutoCloseable {
/**
* Queues {@link Loader} task implemented by users for execution. The load Task may not have
* completed when the method returns. If the queue is full, this method will block.
+ *
+ * <p>
+ * If a previous execution of loader has thrown an exception, then this call may throw an
+ * exception. To avoid this odd behavior use {@link #submit(Loader)} instead which relays
+ * exceptions through the returned future.
*/
void execute(Loader loader);
@@ -33,13 +40,37 @@ public interface LoaderExecutor extends AutoCloseable {
* Same as {@link #execute(Loader)}, but allows specifying an identity. The identity is used in
* metrics and trace logging. When an identity is not supplied, the class name is used. In the
* case of lambdas the class name may not be the same in different processes.
- *
+ *
* @since 1.1.0
*/
void execute(String identity, Loader loader);
+
+ /**
+ * Same as {@link #execute(Loader)} except it returns a future that completes upon successful
+ * commit and if an exception is thrown in the loader, it will be relayed through the future.
+ *
+ * @since 2.0.0
+ */
+ CompletableFuture<Void> submit(Loader loader);
+
+
+ /**
+ * Same behavior as {@link #submit(Loader)}.
+ *
+ * @param identity see {@link #execute(String, Loader)} for a description of this parameter
+ * @since 2.0.0
+ */
+ CompletableFuture<Void> submit(String identity, Loader loader);
+
/**
* Waits for all queued and running Loader task to complete, then cleans up resources.
+ *
+ * <p>
+ * If a loader executed via {@link #execute(Loader)} or {@link #execute(String, Loader)} threw an
+ * exception then this method will throw an exception. Exceptions thrown by loaders executed using
+ * {@link #submit(Loader)} or {@link #submit(String, Loader)} will never cause this method to throw
+ * an exception.
*/
@Override
void close();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index 87f9e3f..5c3a539 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -15,6 +15,7 @@
package org.apache.fluo.core.client;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
@@ -61,6 +62,7 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
Loader loader;
private AtomicBoolean done = new AtomicBoolean(false);
private String identity;
+ private CompletableFuture<Void> future;
private void close() {
txi = null;
@@ -81,15 +83,28 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
}
+ public LoaderCommitObserver(String alias, Loader loader2, CompletableFuture<Void> future) {
+ this(alias, loader2);
+ this.future = future;
+ }
+
+
@Override
public void committed() {
close();
+ if (future != null) {
+ future.complete(null);
+ }
}
@Override
public void failed(Throwable t) {
close();
- setException(t);
+ if (future == null) {
+ setException(t);
+ } else {
+ future.completeExceptionally(t);
+ }
}
@Override
@@ -131,7 +146,11 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
loader.load(txi, context);
env.getSharedResources().getCommitManager().beginCommit(txi, identity, this);
} catch (Exception e) {
- setException(e);
+ if (future == null) {
+ setException(e);
+ } else {
+ future.completeExceptionally(e);
+ }
close();
LoggerFactory.getLogger(LoaderCommitObserver.class).debug(e.getMessage(), e);
}
@@ -214,6 +233,37 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
}
@Override
+ public CompletableFuture<Void> submit(Loader loader) {
+ return submit(loader.getClass().getSimpleName(), loader);
+ }
+
+ @Override
+ public CompletableFuture<Void> submit(String alias, Loader loader) {
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
+
+ try {
+ while (!semaphore.tryAcquire(50, TimeUnit.MILLISECONDS)) {
+ if (closed.get()) {
+ throw new IllegalStateException("LoaderExecutor is closed");
+ }
+ }
+ } catch (InterruptedException e1) {
+ throw new RuntimeException(e1);
+ }
+
+ try {
+ commiting.increment();
+ executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(alias, loader, future)));
+ } catch (RejectedExecutionException rje) {
+ semaphore.release();
+ commiting.decrement();
+ throw rje;
+ }
+
+ return future;
+ }
+
+ @Override
public void close() {
if (closed.compareAndSet(false, true)) {
// wait for queue to empty and prevent anything else from being enqueued
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
index 36bf0ab..6f141aa 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -15,8 +15,14 @@
package org.apache.fluo.integration.client;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.AlreadySetException;
@@ -53,4 +59,58 @@ public class LoaderExecutorIT extends ITBaseMini {
Assert.assertEquals(AlreadySetException.class, e.getCause().getClass());
}
}
+
+ @Test
+ public void testSubmit() throws Exception {
+
+ LoaderExecutor le = client.newLoaderExecutor();
+
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ futures.add(le.submit("test", (tx, ctx) -> {
+ tx.set("1234", new Column("last", "date"), "20060101");
+ }));
+
+ futures.add(le.submit("test", (tx, ctx) -> {
+ tx.set("6789", new Column("last", "date"), "20050101");
+ }));
+
+ futures.add(le.submit("test", (tx, ctx) -> {
+ tx.set("0abc", new Column("last", "date"), "20070101");
+ }));
+
+ futures.add(le.submit("test", (tx, ctx) -> {
+ tx.set("ef01", new Column("last", "date"), "20040101");
+ // setting same thing should cause exception
+ tx.set("ef01", new Column("last", "date"), "20040101");
+ }));
+
+
+ // wait for transaction to commit
+ futures.get(0).get();
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("20060101", snap.gets("1234", new Column("last", "date")));
+ }
+
+ // wait for transaction to commit
+ futures.get(1).get();
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("20050101", snap.gets("6789", new Column("last", "date")));
+ }
+
+ le.close();
+
+ // wait for transaction to commit
+ futures.get(2).get();
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("20070101", snap.gets("0abc", new Column("last", "date")));
+ }
+
+ try {
+ futures.get(3).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ // expected from future
+ }
+ }
}