You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2020/07/16 15:57:27 UTC

[fluo] branch master updated: Fixes #795 added submit method that returns future to loader executor (#1100)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 737ad72  Fixes #795 added submit method that returns future to loader executor (#1100)
737ad72 is described below

commit 737ad72ec7a6594dca31837fbc9bc803978fc72c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jul 16 11:57:18 2020 -0400

    Fixes #795 added submit method that returns future to loader executor (#1100)
---
 .../org/apache/fluo/api/client/LoaderExecutor.java | 37 ++++++++++++-
 .../fluo/core/client/LoaderExecutorAsyncImpl.java  | 54 +++++++++++++++++-
 .../fluo/integration/client/LoaderExecutorIT.java  | 64 +++++++++++++++++++++-
 3 files changed, 148 insertions(+), 7 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
index 6868d92..b69fffb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -15,6 +15,8 @@
 
 package org.apache.fluo.api.client;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Executes provided {@link Loader} objects to load data into Fluo. {@link LoaderExecutor#close()}
  * should be called when finished.
@@ -26,6 +28,11 @@ public interface LoaderExecutor extends AutoCloseable {
   /**
    * Queues {@link Loader} task implemented by users for execution. The load Task may not have
    * completed when the method returns. If the queue is full, this method will block.
+   *
+   * <p>
+   * If a previous execution of loader has thrown an exception, then this call may throw an
+   * exception. To avoid this odd behavior use {@link #submit(Loader)} instead which relays
+   * exceptions through the returned future.
    */
   void execute(Loader loader);
 
@@ -33,13 +40,37 @@ public interface LoaderExecutor extends AutoCloseable {
    * Same as {@link #execute(Loader)}, but allows specifying an identity. The identity is used in
    * metrics and trace logging. When an identity is not supplied, the class name is used. In the
    * case of lambdas the class name may not be the same in different processes.
-   * 
+   *
    * @since 1.1.0
    */
   void execute(String identity, Loader loader);
 
+
+  /**
+   * Same as {@link #execute(Loader)} except it returns a future that completes upon successful
+   * commit and if an exception is thrown in the loader, it will be relayed through the future.
+   *
+   * @since 2.0.0
+   */
+  CompletableFuture<Void> submit(Loader loader);
+
+
+  /**
+   * Same behavior as {@link #submit(Loader)}.
+   *
+   * @param identity see {@link #execute(String, Loader)} for a description of this parameter
+   * @since 2.0.0
+   */
+  CompletableFuture<Void> submit(String identity, Loader loader);
+
   /**
    * Waits for all queued and running Loader task to complete, then cleans up resources.
+   *
+   * <p>
+   * If a loader executed via {@link #execute(Loader)} or {@link #execute(String, Loader)} threw an
+   * exception then this method will throw an exception. Exceptions thrown by loaders executed using
+   * {@link #submit(Loader)} or {@link #submit(String, Loader)} will never cause this method to throw
+   * an exception.
    */
   @Override
   void close();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index 87f9e3f..5c3a539 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -15,6 +15,7 @@
 
 package org.apache.fluo.core.client;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
@@ -61,6 +62,7 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
     Loader loader;
     private AtomicBoolean done = new AtomicBoolean(false);
     private String identity;
+    private CompletableFuture<Void> future;
 
     private void close() {
       txi = null;
@@ -81,15 +83,28 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
     }
 
 
+    public LoaderCommitObserver(String alias, Loader loader2, CompletableFuture<Void> future) {
+      this(alias, loader2);
+      this.future = future;
+    }
+
+
     @Override
     public void committed() {
       close();
+      if (future != null) {
+        future.complete(null);
+      }
     }
 
     @Override
     public void failed(Throwable t) {
       close();
-      setException(t);
+      if (future == null) {
+        setException(t);
+      } else {
+        future.completeExceptionally(t);
+      }
     }
 
     @Override
@@ -131,7 +146,11 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
         loader.load(txi, context);
         env.getSharedResources().getCommitManager().beginCommit(txi, identity, this);
       } catch (Exception e) {
-        setException(e);
+        if (future == null) {
+          setException(e);
+        } else {
+          future.completeExceptionally(e);
+        }
         close();
         LoggerFactory.getLogger(LoaderCommitObserver.class).debug(e.getMessage(), e);
       }
@@ -214,6 +233,37 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
   }
 
   @Override
+  public CompletableFuture<Void> submit(Loader loader) {
+    return submit(loader.getClass().getSimpleName(), loader);
+  }
+
+  @Override
+  public CompletableFuture<Void> submit(String alias, Loader loader) {
+    CompletableFuture<Void> future = new CompletableFuture<Void>();
+
+    try {
+      while (!semaphore.tryAcquire(50, TimeUnit.MILLISECONDS)) {
+        if (closed.get()) {
+          throw new IllegalStateException("LoaderExecutor is closed");
+        }
+      }
+    } catch (InterruptedException e1) {
+      throw new RuntimeException(e1);
+    }
+
+    try {
+      commiting.increment();
+      executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(alias, loader, future)));
+    } catch (RejectedExecutionException rje) {
+      semaphore.release();
+      commiting.decrement();
+      throw rje;
+    }
+
+    return future;
+  }
+
+  @Override
   public void close() {
     if (closed.compareAndSet(false, true)) {
       // wait for queue to empty and prevent anything else from being enqueued
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
index 36bf0ab..6f141aa 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/LoaderExecutorIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -15,8 +15,14 @@
 
 package org.apache.fluo.integration.client;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.fluo.api.client.Loader;
 import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.AlreadySetException;
@@ -53,4 +59,58 @@ public class LoaderExecutorIT extends ITBaseMini {
       Assert.assertEquals(AlreadySetException.class, e.getCause().getClass());
     }
   }
+
+  @Test
+  public void testSubmit() throws Exception {
+
+    LoaderExecutor le = client.newLoaderExecutor();
+
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+    futures.add(le.submit("test", (tx, ctx) -> {
+      tx.set("1234", new Column("last", "date"), "20060101");
+    }));
+
+    futures.add(le.submit("test", (tx, ctx) -> {
+      tx.set("6789", new Column("last", "date"), "20050101");
+    }));
+
+    futures.add(le.submit("test", (tx, ctx) -> {
+      tx.set("0abc", new Column("last", "date"), "20070101");
+    }));
+
+    futures.add(le.submit("test", (tx, ctx) -> {
+      tx.set("ef01", new Column("last", "date"), "20040101");
+      // setting same thing should cause exception
+      tx.set("ef01", new Column("last", "date"), "20040101");
+    }));
+
+
+    // wait for transaction to commit
+    futures.get(0).get();
+    try (Snapshot snap = client.newSnapshot()) {
+      Assert.assertEquals("20060101", snap.gets("1234", new Column("last", "date")));
+    }
+
+    // wait for transaction to commit
+    futures.get(1).get();
+    try (Snapshot snap = client.newSnapshot()) {
+      Assert.assertEquals("20050101", snap.gets("6789", new Column("last", "date")));
+    }
+
+    le.close();
+
+    // wait for transaction to commit
+    futures.get(2).get();
+    try (Snapshot snap = client.newSnapshot()) {
+      Assert.assertEquals("20070101", snap.gets("0abc", new Column("last", "date")));
+    }
+
+    try {
+      futures.get(3).get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+      // expected from future
+    }
+  }
 }