You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/27 17:43:34 UTC

[GitHub] jihoonson closed pull request #6050: Fix IllegalArgumentException in TaskLockBox.syncFromStorage() on rolling update

jihoonson closed pull request #6050: Fix IllegalArgumentException in TaskLockBox.syncFromStorage() on rolling update
URL: https://github.com/apache/incubator-druid/pull/6050
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 2f8f224289f..d3a58240d0f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -165,9 +165,9 @@ private static String makeTaskId(String dataSource, int randomBits)
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index 446078ab1f0..d40e42de67a 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -36,6 +36,7 @@
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -64,17 +65,17 @@ protected AbstractTask(String id, String dataSource, Map<String, Object> context
 
   protected AbstractTask(
       String id,
-      String groupId,
-      TaskResource taskResource,
+      @Nullable String groupId,
+      @Nullable TaskResource taskResource,
       String dataSource,
-      Map<String, Object> context
+      @Nullable Map<String, Object> context
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
     this.groupId = groupId == null ? id : groupId;
     this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
-    this.context = context;
+    this.context = context == null ? new HashMap<>() : context;
   }
 
   static String getOrMakeId(String id, final String typeName, String dataSource)
@@ -162,11 +163,13 @@ public void stopGracefully()
   @Override
   public String toString()
   {
-    return Objects.toStringHelper(this)
-                  .add("id", id)
-                  .add("type", getType())
-                  .add("dataSource", dataSource)
-                  .toString();
+    return "AbstractTask{" +
+           "id='" + id + '\'' +
+           ", groupId='" + groupId + '\'' +
+           ", taskResource=" + taskResource +
+           ", dataSource='" + dataSource + '\'' +
+           ", context=" + context +
+           '}';
   }
 
   /**
@@ -207,13 +210,21 @@ public boolean equals(Object o)
       return false;
     }
 
-    return true;
+    if (!groupId.equals(that.groupId)) {
+      return false;
+    }
+
+    if (!dataSource.equals(that.dataSource)) {
+      return false;
+    }
+
+    return context.equals(that.context);
   }
 
   @Override
   public int hashCode()
   {
-    return id.hashCode();
+    return Objects.hashCode(id, groupId, dataSource, context);
   }
 
   static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index aaf8bfd04ae..5dcc35bde4a 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -39,13 +39,13 @@
 import io.druid.discovery.DruidNodeDiscoveryProvider;
 import io.druid.discovery.LookupNodeService;
 import io.druid.indexer.IngestionState;
+import io.druid.indexer.TaskStatus;
 import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import io.druid.indexing.common.TaskReport;
-import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import io.druid.indexing.common.actions.TaskActionClient;
@@ -202,9 +202,9 @@ public AppenderatorDriverRealtimeIndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index 53855574db2..2ffd5be8a90 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -178,9 +178,9 @@ public String getType()
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
   }
 
   @VisibleForTesting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index b91f5edfa82..a49239abcc7 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -39,12 +39,12 @@
 import io.druid.indexer.MetadataStorageUpdaterJobHandler;
 import io.druid.indexer.TaskMetricsGetter;
 import io.druid.indexer.TaskMetricsUtils;
+import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskLockType;
 import io.druid.indexing.common.TaskReport;
-import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.LockAcquireAction;
 import io.druid.indexing.common.actions.LockTryAcquireAction;
@@ -171,9 +171,9 @@ public HadoopIndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index a6d3c53060f..0baafbdb81a 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -41,6 +41,7 @@
 import io.druid.data.input.Rows;
 import io.druid.hll.HyperLogLogCollector;
 import io.druid.indexer.IngestionState;
+import io.druid.indexer.TaskStatus;
 import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@@ -48,7 +49,6 @@
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import io.druid.indexing.common.TaskReport;
-import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import io.druid.indexing.common.actions.TaskActionClient;
@@ -240,9 +240,9 @@ public IndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 77b607e1246..675f27a64f6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -33,8 +33,8 @@
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
-import io.druid.indexing.common.TaskLock;
 import io.druid.indexer.TaskStatus;
+import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.SegmentListUsedAction;
 import io.druid.indexing.common.actions.TaskActionClient;
@@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment)
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index 6571364ac16..fab5a9d6b06 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -151,6 +151,12 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
     }
   }
 
+  @Override
+  public int getDefaultPriority()
+  {
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+  }
+
   public static NoopTask create()
   {
     return new NoopTask(null, null, 0, 0, null, null, null);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 00bb1bdac1c..5ea6c081020 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -33,10 +33,10 @@
 import io.druid.discovery.DiscoveryDruidNode;
 import io.druid.discovery.DruidNodeDiscoveryProvider;
 import io.druid.discovery.LookupNodeService;
+import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskLockType;
 import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
-import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.LockAcquireAction;
 import io.druid.indexing.common.actions.LockReleaseAction;
@@ -161,9 +161,9 @@ public RealtimeIndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index 293bb74350d..8848f7fa8a5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -27,7 +27,6 @@
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -85,15 +84,28 @@
    * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
    * be used for task scheduling, cluster resource management, etc.
    *
+   * The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
+   *
+   * It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
+   *
    * @return task priority
    *
    * @see Tasks for default task priorities
+   * @see io.druid.indexing.overlord.http.OverlordResource#taskPost
    */
   default int getPriority()
   {
     return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
   }
 
+  /**
+   * Returns the default task priority. It can vary depending on the task type.
+   */
+  default int getDefaultPriority()
+  {
+    return Tasks.DEFAULT_TASK_PRIORITY;
+  }
+
   /**
    * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
    * require.
@@ -180,13 +192,17 @@ default int getPriority()
    */
   TaskStatus run(TaskToolbox toolbox) throws Exception;
 
-  @Nullable
+  default Map<String, Object> addToContext(String key, Object val)
+  {
+    getContext().put(key, val);
+    return getContext();
+  }
+
   Map<String, Object> getContext();
 
-  @Nullable
   default <ContextValueType> ContextValueType getContextValue(String key)
   {
-    return getContext() == null ? null : (ContextValueType) getContext().get(key);
+    return (ContextValueType) getContext().get(key);
   }
 
   default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 68b5ff69167..5c59eefaf82 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -44,6 +44,7 @@
 import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.indexing.common.actions.TaskActionHolder;
 import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskMaster;
 import io.druid.indexing.overlord.TaskQueue;
@@ -174,6 +175,11 @@ public Response taskPost(
           public Response apply(TaskQueue taskQueue)
           {
             try {
+              // Set default priority if needed
+              final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
+              if (priority == null) {
+                task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
+              }
               taskQueue.add(task);
               return Response.ok(ImmutableMap.of("task", task.getId())).build();
             }
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 4b01bb163f5..f2dceb0ee30 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -21,9 +21,9 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Iterables;
+import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskLockType;
-import io.druid.indexer.TaskStatus;
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index 7dc3c1f2880..ca3eda8512f 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,6 +37,7 @@
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskLockbox;
@@ -80,6 +81,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -231,6 +233,12 @@ public void testOverlordRun() throws Exception
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
 
+    final Map<String, Object> context = task_0.getContext();
+    Assert.assertEquals(1, context.size());
+    final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
+    Assert.assertNotNull(priority);
+    Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
+
     // Duplicate task - should fail
     response = overlordResource.taskPost(task_0, req);
     Assert.assertEquals(400, response.getStatus());
diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
index ce05ec2c903..6986cf7906d 100644
--- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -20,6 +20,7 @@
 package io.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import io.druid.java.util.common.StringUtils;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
@@ -31,6 +32,7 @@
 public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
     extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
 {
+  @VisibleForTesting
   DerbyMetadataStorageActionHandler(
       SQLMetadataConnector connector,
       ObjectMapper jsonMapper,
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 8374ecee07d..2c26e34ae57 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -22,9 +22,8 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.druid.indexer.TaskInfo;
@@ -128,53 +127,42 @@ public void insert(
   ) throws EntryExistsException
   {
     try {
-      connector.retryWithHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle) throws Exception
-            {
-              handle.createStatement(
-                  StringUtils.format(
-                      "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
-                      entryTable
-                  )
-              )
-                    .bind("id", id)
-                    .bind("created_date", timestamp.toString())
-                    .bind("datasource", dataSource)
-                    .bind("payload", jsonMapper.writeValueAsBytes(entry))
-                    .bind("active", active)
-                    .bind("status_payload", jsonMapper.writeValueAsBytes(status))
-                    .execute();
-              return null;
-            }
+      getConnector().retryWithHandle(
+          (HandleCallback<Void>) handle -> {
+            final String sql = StringUtils.format(
+                "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) "
+                + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
+                getEntryTable()
+            );
+            handle.createStatement(sql)
+                  .bind("id", id)
+                  .bind("created_date", timestamp.toString())
+                  .bind("datasource", dataSource)
+                  .bind("payload", jsonMapper.writeValueAsBytes(entry))
+                  .bind("active", active)
+                  .bind("status_payload", jsonMapper.writeValueAsBytes(status))
+                  .execute();
+            return null;
           },
-          new Predicate<Throwable>()
-          {
-            @Override
-            public boolean apply(Throwable e)
-            {
-              final boolean isStatementException = e instanceof StatementException ||
-                                                   (e instanceof CallbackFailedException
-                                                    && e.getCause() instanceof StatementException);
-              return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent());
-            }
-          }
+          e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent())
       );
     }
     catch (Exception e) {
-      final boolean isStatementException = e instanceof StatementException ||
-                                           (e instanceof CallbackFailedException
-                                            && e.getCause() instanceof StatementException);
-      if (isStatementException && getEntry(id).isPresent()) {
+      if (isStatementException(e) && getEntry(id).isPresent()) {
         throw new EntryExistsException(id, e);
       } else {
-        throw Throwables.propagate(e);
+        throw new RuntimeException(e);
       }
     }
   }
 
+  @VisibleForTesting
+  protected static boolean isStatementException(Throwable e)
+  {
+    return e instanceof StatementException ||
+           (e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
+  }
+
   @Override
   public boolean setStatus(final String entryId, final boolean active, final StatusType status)
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org