You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/07/27 18:50:09 UTC

[incubator-druid] branch 0.12.2 updated: [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050) (#6054)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new f823e80  [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050) (#6054)
f823e80 is described below

commit f823e80d747c36d1957b82030f12c5f5bb552307
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jul 27 11:50:06 2018 -0700

    [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050) (#6054)
---
 .../io/druid/indexing/kafka/KafkaIndexTask.java    |  4 +-
 .../druid/indexing/common/task/AbstractTask.java   | 33 +++++++----
 .../druid/indexing/common/task/CompactionTask.java |  4 +-
 .../indexing/common/task/HadoopIndexTask.java      |  4 +-
 .../io/druid/indexing/common/task/IndexTask.java   |  6 +-
 .../druid/indexing/common/task/MergeTaskBase.java  | 12 ++--
 .../io/druid/indexing/common/task/NoopTask.java    |  6 ++
 .../indexing/common/task/RealtimeIndexTask.java    |  6 +-
 .../java/io/druid/indexing/common/task/Task.java   | 24 ++++++--
 .../indexing/overlord/http/OverlordResource.java   |  6 ++
 .../druid/indexing/overlord/http/OverlordTest.java |  8 +++
 .../DerbyMetadataStorageActionHandler.java         |  2 +
 .../metadata/SQLMetadataStorageActionHandler.java  | 66 +++++++++-------------
 13 files changed, 109 insertions(+), 72 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0..76468b6 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -299,9 +299,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index cbf10f4..adde5e9 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -36,6 +36,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -64,17 +65,17 @@ public abstract class AbstractTask implements Task
 
   protected AbstractTask(
       String id,
-      String groupId,
-      TaskResource taskResource,
+      @Nullable String groupId,
+      @Nullable TaskResource taskResource,
       String dataSource,
-      Map<String, Object> context
+      @Nullable Map<String, Object> context
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
     this.groupId = groupId == null ? id : groupId;
     this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
-    this.context = context;
+    this.context = context == null ? new HashMap<>() : context;
   }
 
   static String getOrMakeId(String id, final String typeName, String dataSource)
@@ -162,11 +163,13 @@ public abstract class AbstractTask implements Task
   @Override
   public String toString()
   {
-    return Objects.toStringHelper(this)
-                  .add("id", id)
-                  .add("type", getType())
-                  .add("dataSource", dataSource)
-                  .toString();
+    return "AbstractTask{" +
+           "id='" + id + '\'' +
+           ", groupId='" + groupId + '\'' +
+           ", taskResource=" + taskResource +
+           ", dataSource='" + dataSource + '\'' +
+           ", context=" + context +
+           '}';
   }
 
   /**
@@ -207,13 +210,21 @@ public abstract class AbstractTask implements Task
       return false;
     }
 
-    return true;
+    if (!groupId.equals(that.groupId)) {
+      return false;
+    }
+
+    if (!dataSource.equals(that.dataSource)) {
+      return false;
+    }
+
+    return context.equals(that.context);
   }
 
   @Override
   public int hashCode()
   {
-    return id.hashCode();
+    return Objects.hashCode(id, groupId, dataSource, context);
   }
 
   static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index 5e94285..ad77601 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -160,9 +160,9 @@ public class CompactionTask extends AbstractTask
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
   }
 
   @VisibleForTesting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4386b5b..6a08a82 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -121,9 +121,9 @@ public class HadoopIndexTask extends HadoopTask
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 1d88733..6cbc278 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -67,9 +67,9 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
 import io.druid.segment.realtime.RealtimeMetricsMonitor;
 import io.druid.segment.realtime.appenderator.Appenderator;
 import io.druid.segment.realtime.appenderator.AppenderatorConfig;
-import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import io.druid.segment.realtime.appenderator.SegmentAllocator;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
@@ -170,9 +170,9 @@ public class IndexTask extends AbstractTask
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 454899f..3a9fc97 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -34,9 +34,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
@@ -45,8 +42,11 @@ import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
-import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.segment.IndexIO;
+import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
@@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index e060f3c..6c6358d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -151,6 +151,12 @@ public class NoopTask extends AbstractTask
     }
   }
 
+  @Override
+  public int getDefaultPriority()
+  {
+    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+  }
+
   public static NoopTask create()
   {
     return new NoopTask(null, null, 0, 0, null, null, null);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 2a6b3b4..053df11 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -28,7 +28,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Ints;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.data.input.Committer;
 import io.druid.data.input.Firehose;
 import io.druid.data.input.FirehoseFactory;
@@ -45,6 +44,7 @@ import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.guava.CloseQuietly;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.DruidMetrics;
 import io.druid.query.FinalizeResultsQueryRunner;
 import io.druid.query.Query;
@@ -162,9 +162,9 @@ public class RealtimeIndexTask extends AbstractTask
   }
 
   @Override
-  public int getPriority()
+  public int getDefaultPriority()
   {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index aeb0578..c69b6b5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -27,7 +27,6 @@ import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -84,9 +83,14 @@ public interface Task
    * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
    * be used for task scheduling, cluster resource management, etc.
    *
+   * The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
+   *
+   * It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
+   *
    * @return task priority
    *
    * @see Tasks for default task priorities
+   * @see io.druid.indexing.overlord.http.OverlordResource#taskPost
    */
   default int getPriority()
   {
@@ -94,6 +98,14 @@ public interface Task
   }
 
   /**
+   * Returns the default task priority. It can vary depending on the task type.
+   */
+  default int getDefaultPriority()
+  {
+    return Tasks.DEFAULT_TASK_PRIORITY;
+  }
+
+  /**
    * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
    * require.
    *
@@ -179,13 +191,17 @@ public interface Task
    */
   TaskStatus run(TaskToolbox toolbox) throws Exception;
 
-  @Nullable
+  default Map<String, Object> addToContext(String key, Object val)
+  {
+    getContext().put(key, val);
+    return getContext();
+  }
+
   Map<String, Object> getContext();
 
-  @Nullable
   default <ContextValueType> ContextValueType getContextValue(String key)
   {
-    return getContext() == null ? null : (ContextValueType) getContext().get(key);
+    return (ContextValueType) getContext().get(key);
   }
 
   default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 8e4be31..aeb3141 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -41,6 +41,7 @@ import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.indexing.common.actions.TaskActionHolder;
 import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskMaster;
 import io.druid.indexing.overlord.TaskQueue;
@@ -164,6 +165,11 @@ public class OverlordResource
           public Response apply(TaskQueue taskQueue)
           {
             try {
+              // Set default priority if needed
+              final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
+              if (priority == null) {
+                task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
+              }
               taskQueue.add(task);
               return Response.ok(ImmutableMap.of("task", task.getId())).build();
             }
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index 8bfdc3c..f389351 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,6 +37,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskLockbox;
@@ -80,6 +81,7 @@ import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -230,6 +232,12 @@ public class OverlordTest
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
 
+    final Map<String, Object> context = task_0.getContext();
+    Assert.assertEquals(1, context.size());
+    final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
+    Assert.assertNotNull(priority);
+    Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
+
     // Duplicate task - should fail
     response = overlordResource.taskPost(task_0, req);
     Assert.assertEquals(400, response.getStatus());
diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
index 62acf49..cc79261 100644
--- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -20,6 +20,7 @@
 package io.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import io.druid.java.util.common.StringUtils;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
@@ -31,6 +32,7 @@ import java.util.Map;
 public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
     extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
 {
+  @VisibleForTesting
   DerbyMetadataStorageActionHandler(
       SQLMetadataConnector connector,
       ObjectMapper jsonMapper,
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 66208be..7b9c951 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -22,9 +22,8 @@ package io.druid.metadata;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.druid.java.util.emitter.EmittingLogger;
@@ -121,53 +120,42 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   ) throws EntryExistsException
   {
     try {
-      connector.retryWithHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle) throws Exception
-            {
-              handle.createStatement(
-                  StringUtils.format(
-                      "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
-                      entryTable
-                  )
-              )
-                    .bind("id", id)
-                    .bind("created_date", timestamp.toString())
-                    .bind("datasource", dataSource)
-                    .bind("payload", jsonMapper.writeValueAsBytes(entry))
-                    .bind("active", active)
-                    .bind("status_payload", jsonMapper.writeValueAsBytes(status))
-                    .execute();
-              return null;
-            }
+      getConnector().retryWithHandle(
+          (HandleCallback<Void>) handle -> {
+            final String sql = StringUtils.format(
+                "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) "
+                + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
+                getEntryTable()
+            );
+            handle.createStatement(sql)
+                  .bind("id", id)
+                  .bind("created_date", timestamp.toString())
+                  .bind("datasource", dataSource)
+                  .bind("payload", jsonMapper.writeValueAsBytes(entry))
+                  .bind("active", active)
+                  .bind("status_payload", jsonMapper.writeValueAsBytes(status))
+                  .execute();
+            return null;
           },
-          new Predicate<Throwable>()
-          {
-            @Override
-            public boolean apply(Throwable e)
-            {
-              final boolean isStatementException = e instanceof StatementException ||
-                                                   (e instanceof CallbackFailedException
-                                                    && e.getCause() instanceof StatementException);
-              return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent());
-            }
-          }
+          e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent())
       );
     }
     catch (Exception e) {
-      final boolean isStatementException = e instanceof StatementException ||
-                                           (e instanceof CallbackFailedException
-                                            && e.getCause() instanceof StatementException);
-      if (isStatementException && getEntry(id).isPresent()) {
+      if (isStatementException(e) && getEntry(id).isPresent()) {
         throw new EntryExistsException(id, e);
       } else {
-        throw Throwables.propagate(e);
+        throw new RuntimeException(e);
       }
     }
   }
 
+  @VisibleForTesting
+  protected static boolean isStatementException(Throwable e)
+  {
+    return e instanceof StatementException ||
+           (e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
+  }
+
   @Override
   public boolean setStatus(final String entryId, final boolean active, final StatusType status)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org