You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/27 18:50:08 UTC

[GitHub] jihoonson closed pull request #6054: [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage()

jihoonson closed pull request #6054: [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage()
URL: https://github.com/apache/incubator-druid/pull/6054
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0336b..76468b67721 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -299,9 +299,9 @@ private static String makeTaskId(String dataSource, int randomBits)
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index cbf10f4134c..adde5e97ba7 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -36,6 +36,7 @@
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -64,17 +65,17 @@ protected AbstractTask(String id, String dataSource, Map<String, Object> context
 
   protected AbstractTask(
       String id,
-      String groupId,
-      TaskResource taskResource,
+      @Nullable String groupId,
+      @Nullable TaskResource taskResource,
       String dataSource,
-      Map<String, Object> context
+      @Nullable Map<String, Object> context
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
     this.groupId = groupId == null ? id : groupId;
     this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
-    this.context = context;
+    this.context = context == null ? new HashMap<>() : context;
   }
 
   static String getOrMakeId(String id, final String typeName, String dataSource)
@@ -162,11 +163,13 @@ public void stopGracefully()
   @Override
   public String toString()
   {
-    return Objects.toStringHelper(this)
-                  .add("id", id)
-                  .add("type", getType())
-                  .add("dataSource", dataSource)
-                  .toString();
+    return "AbstractTask{" +
+           "id='" + id + '\'' +
+           ", groupId='" + groupId + '\'' +
+           ", taskResource=" + taskResource +
+           ", dataSource='" + dataSource + '\'' +
+           ", context=" + context +
+           '}';
   }
 
   /**
@@ -207,13 +210,21 @@ public boolean equals(Object o)
       return false;
     }
 
-    return true;
+    if (!groupId.equals(that.groupId)) {
+      return false;
+    }
+
+    if (!dataSource.equals(that.dataSource)) {
+      return false;
+    }
+
+    return context.equals(that.context);
   }
 
   @Override
   public int hashCode()
   {
-    return id.hashCode();
+    return Objects.hashCode(id, groupId, dataSource, context);
   }
 
   static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index 5e94285cd9f..ad776011043 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -160,9 +160,9 @@ public String getType()
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
   }
 
   @VisibleForTesting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4386b5b3595..6a08a82811b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -121,9 +121,9 @@ public HadoopIndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 1d88733f952..6cbc27811d9 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -67,9 +67,9 @@
 import io.druid.segment.realtime.RealtimeMetricsMonitor;
 import io.druid.segment.realtime.appenderator.Appenderator;
 import io.druid.segment.realtime.appenderator.AppenderatorConfig;
-import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.SegmentAllocator;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
@@ -170,9 +170,9 @@ public IndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 454899f5c77..3a9fc979394 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -34,9 +34,6 @@
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
@@ -45,8 +42,11 @@
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
-import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.segment.IndexIO;
+import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
@@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment)
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index e060f3c2893..6c6358df4be 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -151,6 +151,12 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
     }
   }
 
+  @Override
+  public int getDefaultPriority()
+  {
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+  }
+
   public static NoopTask create()
   {
     return new NoopTask(null, null, 0, 0, null, null, null);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 2a6b3b4723a..053df113ec9 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -28,7 +28,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Ints;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.data.input.Committer;
 import io.druid.data.input.Firehose;
 import io.druid.data.input.FirehoseFactory;
@@ -45,6 +44,7 @@
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.guava.CloseQuietly;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.DruidMetrics;
 import io.druid.query.FinalizeResultsQueryRunner;
 import io.druid.query.Query;
@@ -162,9 +162,9 @@ public RealtimeIndexTask(
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index aeb05781ec5..c69b6b56e25 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -27,7 +27,6 @@
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -84,15 +83,28 @@
    * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
    * be used for task scheduling, cluster resource management, etc.
    *
+   * The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
+   *
+   * It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
+   *
    * @return task priority
    *
    * @see Tasks for default task priorities
+   * @see io.druid.indexing.overlord.http.OverlordResource#taskPost
    */
   default int getPriority()
   {
     return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
   }
 
+  /**
+   * Returns the default task priority. It can vary depending on the task type.
+   */
+  default int getDefaultPriority()
+  {
+    return Tasks.DEFAULT_TASK_PRIORITY;
+  }
+
   /**
    * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
    * require.
@@ -179,13 +191,17 @@ default int getPriority()
    */
   TaskStatus run(TaskToolbox toolbox) throws Exception;
 
-  @Nullable
+  default Map<String, Object> addToContext(String key, Object val)
+  {
+    getContext().put(key, val);
+    return getContext();
+  }
+
   Map<String, Object> getContext();
 
-  @Nullable
   default <ContextValueType> ContextValueType getContextValue(String key)
   {
-    return getContext() == null ? null : (ContextValueType) getContext().get(key);
+    return (ContextValueType) getContext().get(key);
   }
 
   default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 8e4be31b869..aeb31418b4c 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -41,6 +41,7 @@
 import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.indexing.common.actions.TaskActionHolder;
 import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskMaster;
 import io.druid.indexing.overlord.TaskQueue;
@@ -164,6 +165,11 @@ public Response taskPost(
           public Response apply(TaskQueue taskQueue)
           {
             try {
+              // Set default priority if needed
+              final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
+              if (priority == null) {
+                task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
+              }
               taskQueue.add(task);
               return Response.ok(ImmutableMap.of("task", task.getId())).build();
             }
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index 8bfdc3cb23b..f389351ed38 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,6 +37,7 @@
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskLockbox;
@@ -80,6 +81,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -230,6 +232,12 @@ public void testOverlordRun() throws Exception
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
 
+    final Map<String, Object> context = task_0.getContext();
+    Assert.assertEquals(1, context.size());
+    final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
+    Assert.assertNotNull(priority);
+    Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
+
     // Duplicate task - should fail
     response = overlordResource.taskPost(task_0, req);
     Assert.assertEquals(400, response.getStatus());
diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
index 62acf4965ee..cc79261128e 100644
--- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -20,6 +20,7 @@
 package io.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import io.druid.java.util.common.StringUtils;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
@@ -31,6 +32,7 @@
 public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
     extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
 {
+  @VisibleForTesting
   DerbyMetadataStorageActionHandler(
       SQLMetadataConnector connector,
       ObjectMapper jsonMapper,
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 66208be9569..7b9c9513baf 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -22,9 +22,8 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.druid.java.util.emitter.EmittingLogger;
@@ -121,53 +120,42 @@ public void insert(
   ) throws EntryExistsException
   {
     try {
-      connector.retryWithHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle) throws Exception
-            {
-              handle.createStatement(
-                  StringUtils.format(
-                      "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
-                      entryTable
-                  )
-              )
-                    .bind("id", id)
-                    .bind("created_date", timestamp.toString())
-                    .bind("datasource", dataSource)
-                    .bind("payload", jsonMapper.writeValueAsBytes(entry))
-                    .bind("active", active)
-                    .bind("status_payload", jsonMapper.writeValueAsBytes(status))
-                    .execute();
-              return null;
-            }
+      getConnector().retryWithHandle(
+          (HandleCallback<Void>) handle -> {
+            final String sql = StringUtils.format(
+                "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) "
+                + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
+                getEntryTable()
+            );
+            handle.createStatement(sql)
+                  .bind("id", id)
+                  .bind("created_date", timestamp.toString())
+                  .bind("datasource", dataSource)
+                  .bind("payload", jsonMapper.writeValueAsBytes(entry))
+                  .bind("active", active)
+                  .bind("status_payload", jsonMapper.writeValueAsBytes(status))
+                  .execute();
+            return null;
           },
-          new Predicate<Throwable>()
-          {
-            @Override
-            public boolean apply(Throwable e)
-            {
-              final boolean isStatementException = e instanceof StatementException ||
-                                                   (e instanceof CallbackFailedException
-                                                    && e.getCause() instanceof StatementException);
-              return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent());
-            }
-          }
+          e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent())
       );
     }
     catch (Exception e) {
-      final boolean isStatementException = e instanceof StatementException ||
-                                           (e instanceof CallbackFailedException
-                                            && e.getCause() instanceof StatementException);
-      if (isStatementException && getEntry(id).isPresent()) {
+      if (isStatementException(e) && getEntry(id).isPresent()) {
         throw new EntryExistsException(id, e);
       } else {
-        throw Throwables.propagate(e);
+        throw new RuntimeException(e);
       }
     }
   }
 
+  @VisibleForTesting
+  protected static boolean isStatementException(Throwable e)
+  {
+    return e instanceof StatementException ||
+           (e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
+  }
+
   @Override
   public boolean setStatus(final String entryId, final boolean active, final StatusType status)
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org