You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/09 20:55:19 UTC

[GitHub] jihoonson closed pull request #5984: [Backport] Proper handling of the exceptions from auto persisting in AppenderatorImpl.add()

jihoonson closed pull request #5984: [Backport] Proper handling of the exceptions from auto persisting in AppenderatorImpl.add()
URL: https://github.com/apache/incubator-druid/pull/5984
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
index 2fd6877a965..811fe59ce01 100644
--- a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
+++ b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java
@@ -35,7 +35,7 @@ public ThreadRenamingCallable(
   }
 
   @Override
-  public final T call()
+  public final T call() throws Exception
   {
     final Thread currThread = Thread.currentThread();
     String currName = currThread.getName();
@@ -48,5 +48,5 @@ public final T call()
     }
   }
 
-  public abstract T doCall();
+  public abstract T doCall() throws Exception;
 }
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 70c22187d8d..3f0f0bac73b 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -33,12 +33,11 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
 import io.druid.common.guava.ThreadRenamingCallable;
@@ -48,10 +47,13 @@
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.RE;
 import io.druid.java.util.common.RetryUtils;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.io.Closer;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
 import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -131,6 +133,8 @@
   // and abandon threads do not step over each other
   private final Lock commitLock = new ReentrantLock();
 
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
   private volatile ListeningExecutorService persistExecutor = null;
   private volatile ListeningExecutorService pushExecutor = null;
   // use intermediate executor so that deadlock conditions can be prevented
@@ -140,7 +144,8 @@
   private volatile long nextFlush;
   private volatile FileLock basePersistDirLock = null;
   private volatile FileChannel basePersistDirLockChannel = null;
-  private AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile Throwable persistError;
 
   public AppenderatorImpl(
       DataSchema schema,
@@ -198,6 +203,13 @@ public Object startJob()
     return retVal;
   }
 
+  private void throwPersistErrorIfExists()
+  {
+    if (persistError != null) {
+      throw new RE(persistError, "Error while persisting");
+    }
+  }
+
   @Override
   public AppenderatorAddResult add(
       final SegmentIdentifier identifier,
@@ -206,6 +218,8 @@ public AppenderatorAddResult add(
       final boolean allowIncrementalPersists
   ) throws IndexSizeExceededException, SegmentNotWritableException
   {
+    throwPersistErrorIfExists();
+
     if (!identifier.getDataSource().equals(schema.getDataSource())) {
       throw new IAE(
           "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!",
@@ -244,7 +258,23 @@ public AppenderatorAddResult add(
         || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
       if (allowIncrementalPersists) {
         // persistAll clears rowsCurrentlyInMemory, no need to update it.
-        persistAll(committerSupplier == null ? null : committerSupplier.get());
+        Futures.addCallback(
+            persistAll(committerSupplier == null ? null : committerSupplier.get()),
+            new FutureCallback<Object>()
+            {
+              @Override
+              public void onSuccess(@Nullable Object result)
+              {
+                // do nothing
+              }
+
+              @Override
+              public void onFailure(Throwable t)
+              {
+                persistError = t;
+              }
+            }
+        );
       } else {
         isPersistRequired = true;
       }
@@ -340,6 +370,8 @@ public void clear() throws InterruptedException
     // Drop commit metadata, then abandon all segments.
 
     try {
+      throwPersistErrorIfExists();
+
       if (persistExecutor != null) {
         final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
             new Callable<Object>()
@@ -373,7 +405,7 @@ public Object call() throws Exception
       }
     }
     catch (ExecutionException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -391,6 +423,8 @@ public Object call() throws Exception
   @Override
   public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
   {
+    throwPersistErrorIfExists();
+
     final Map<String, Integer> currentHydrants = Maps.newHashMap();
     final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
     int numPersistedRows = 0;
@@ -427,7 +461,7 @@ public Object call() throws Exception
         new ThreadRenamingCallable<Object>(threadName)
         {
           @Override
-          public Object doCall()
+          public Object doCall() throws IOException
           {
             try {
               for (Pair<FireHydrant, SegmentIdentifier> pair : indexesToPersist) {
@@ -469,9 +503,9 @@ public Object doCall()
               // return null if committer is null
               return commitMetadata;
             }
-            catch (Exception e) {
+            catch (IOException e) {
               metrics.incrementFailedPersists();
-              throw Throwables.propagate(e);
+              throw e;
             }
             finally {
               metrics.incrementNumPersists();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org