You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/07/27 17:43:37 UTC
[incubator-druid] branch master updated: Fix
IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1524af7 Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050)
1524af7 is described below
commit 1524af703dac18bb4dd05e579f37e920606f9f22
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jul 27 10:43:32 2018 -0700
Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050)
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 4 +-
.../druid/indexing/common/task/AbstractTask.java | 33 +++++++----
.../task/AppenderatorDriverRealtimeIndexTask.java | 6 +-
.../druid/indexing/common/task/CompactionTask.java | 4 +-
.../indexing/common/task/HadoopIndexTask.java | 6 +-
.../io/druid/indexing/common/task/IndexTask.java | 6 +-
.../druid/indexing/common/task/MergeTaskBase.java | 6 +-
.../io/druid/indexing/common/task/NoopTask.java | 6 ++
.../indexing/common/task/RealtimeIndexTask.java | 6 +-
.../java/io/druid/indexing/common/task/Task.java | 24 ++++++--
.../indexing/overlord/http/OverlordResource.java | 6 ++
.../druid/indexing/overlord/TaskLockboxTest.java | 2 +-
.../druid/indexing/overlord/http/OverlordTest.java | 8 +++
.../DerbyMetadataStorageActionHandler.java | 2 +
.../metadata/SQLMetadataStorageActionHandler.java | 66 +++++++++-------------
15 files changed, 111 insertions(+), 74 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 2f8f224..d3a5824 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -165,9 +165,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+ return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index 446078a..d40e42d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -36,6 +36,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,17 +65,17 @@ public abstract class AbstractTask implements Task
protected AbstractTask(
String id,
- String groupId,
- TaskResource taskResource,
+ @Nullable String groupId,
+ @Nullable TaskResource taskResource,
String dataSource,
- Map<String, Object> context
+ @Nullable Map<String, Object> context
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId == null ? id : groupId;
this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
- this.context = context;
+ this.context = context == null ? new HashMap<>() : context;
}
static String getOrMakeId(String id, final String typeName, String dataSource)
@@ -162,11 +163,13 @@ public abstract class AbstractTask implements Task
@Override
public String toString()
{
- return Objects.toStringHelper(this)
- .add("id", id)
- .add("type", getType())
- .add("dataSource", dataSource)
- .toString();
+ return "AbstractTask{" +
+ "id='" + id + '\'' +
+ ", groupId='" + groupId + '\'' +
+ ", taskResource=" + taskResource +
+ ", dataSource='" + dataSource + '\'' +
+ ", context=" + context +
+ '}';
}
/**
@@ -207,13 +210,21 @@ public abstract class AbstractTask implements Task
return false;
}
- return true;
+ if (!groupId.equals(that.groupId)) {
+ return false;
+ }
+
+ if (!dataSource.equals(that.dataSource)) {
+ return false;
+ }
+
+ return context.equals(that.context);
}
@Override
public int hashCode()
{
- return id.hashCode();
+ return Objects.hashCode(id, groupId, dataSource, context);
}
static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index aaf8bfd..5dcc35b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -39,13 +39,13 @@ import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.IngestionState;
+import io.druid.indexer.TaskStatus;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexing.common.TaskReport;
-import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
@@ -202,9 +202,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+ return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index 5385557..2ffd5be 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -178,9 +178,9 @@ public class CompactionTask extends AbstractTask
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+ return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
}
@VisibleForTesting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index b91f5ed..a49239a 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -39,12 +39,12 @@ import io.druid.indexer.IngestionState;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexer.TaskMetricsUtils;
+import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskReport;
-import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
@@ -171,9 +171,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+ return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index a6d3c53..0baafbd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -41,6 +41,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.hll.HyperLogLogCollector;
import io.druid.indexer.IngestionState;
+import io.druid.indexer.TaskStatus;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@@ -48,7 +49,6 @@ import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexing.common.TaskReport;
-import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
@@ -240,9 +240,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
+ return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 77b607e1..675f27a 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -33,8 +33,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
-import io.druid.indexing.common.TaskLock;
import io.druid.indexer.TaskStatus;
+import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
@@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
+ return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index 6571364..fab5a9d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -151,6 +151,12 @@ public class NoopTask extends AbstractTask
}
}
+ @Override
+ public int getDefaultPriority()
+ {
+ return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ }
+
public static NoopTask create()
{
return new NoopTask(null, null, 0, 0, null, null, null);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 00bb1bd..5ea6c08 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -33,10 +33,10 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
+import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
-import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockReleaseAction;
@@ -161,9 +161,9 @@ public class RealtimeIndexTask extends AbstractTask
}
@Override
- public int getPriority()
+ public int getDefaultPriority()
{
- return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
+ return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index 293bb74..8848f7f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -27,7 +27,6 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
-import javax.annotation.Nullable;
import java.util.Map;
/**
@@ -85,9 +84,14 @@ public interface Task
* Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
* be used for task scheduling, cluster resource management, etc.
*
+ * The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
+ *
+ * It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
+ *
* @return task priority
*
* @see Tasks for default task priorities
+ * @see io.druid.indexing.overlord.http.OverlordResource#taskPost
*/
default int getPriority()
{
@@ -95,6 +99,14 @@ public interface Task
}
/**
+ * Returns the default task priority. It can vary depending on the task type.
+ */
+ default int getDefaultPriority()
+ {
+ return Tasks.DEFAULT_TASK_PRIORITY;
+ }
+
+ /**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.
*
@@ -180,13 +192,17 @@ public interface Task
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;
- @Nullable
+ default Map<String, Object> addToContext(String key, Object val)
+ {
+ getContext().put(key, val);
+ return getContext();
+ }
+
Map<String, Object> getContext();
- @Nullable
default <ContextValueType> ContextValueType getContextValue(String key)
{
- return getContext() == null ? null : (ContextValueType) getContext().get(key);
+ return (ContextValueType) getContext().get(key);
}
default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 68b5ff6..5c59eef 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -44,6 +44,7 @@ import io.druid.indexer.TaskStatusPlus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
@@ -174,6 +175,11 @@ public class OverlordResource
public Response apply(TaskQueue taskQueue)
{
try {
+ // Set default priority if needed
+ final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
+ if (priority == null) {
+ task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
+ }
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 4b01bb1..f2dceb0 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -21,9 +21,9 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
+import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
-import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index 7dc3c1f..ca3eda8 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,6 +37,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
+import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskLockbox;
@@ -80,6 +81,7 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -231,6 +233,12 @@ public class OverlordTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
+ final Map<String, Object> context = task_0.getContext();
+ Assert.assertEquals(1, context.size());
+ final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
+ Assert.assertNotNull(priority);
+ Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
+
// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus());
diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
index ce05ec2..6986cf7 100644
--- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java
@@ -20,6 +20,7 @@
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import io.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
@@ -31,6 +32,7 @@ import java.util.Map;
public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
{
+ @VisibleForTesting
DerbyMetadataStorageActionHandler(
SQLMetadataConnector connector,
ObjectMapper jsonMapper,
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 8374ece..2c26e34 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -22,9 +22,8 @@ package io.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.indexer.TaskInfo;
@@ -128,53 +127,42 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
) throws EntryExistsException
{
try {
- connector.retryWithHandle(
- new HandleCallback<Void>()
- {
- @Override
- public Void withHandle(Handle handle) throws Exception
- {
- handle.createStatement(
- StringUtils.format(
- "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
- entryTable
- )
- )
- .bind("id", id)
- .bind("created_date", timestamp.toString())
- .bind("datasource", dataSource)
- .bind("payload", jsonMapper.writeValueAsBytes(entry))
- .bind("active", active)
- .bind("status_payload", jsonMapper.writeValueAsBytes(status))
- .execute();
- return null;
- }
+ getConnector().retryWithHandle(
+ (HandleCallback<Void>) handle -> {
+ final String sql = StringUtils.format(
+ "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) "
+ + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
+ getEntryTable()
+ );
+ handle.createStatement(sql)
+ .bind("id", id)
+ .bind("created_date", timestamp.toString())
+ .bind("datasource", dataSource)
+ .bind("payload", jsonMapper.writeValueAsBytes(entry))
+ .bind("active", active)
+ .bind("status_payload", jsonMapper.writeValueAsBytes(status))
+ .execute();
+ return null;
},
- new Predicate<Throwable>()
- {
- @Override
- public boolean apply(Throwable e)
- {
- final boolean isStatementException = e instanceof StatementException ||
- (e instanceof CallbackFailedException
- && e.getCause() instanceof StatementException);
- return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent());
- }
- }
+ e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent())
);
}
catch (Exception e) {
- final boolean isStatementException = e instanceof StatementException ||
- (e instanceof CallbackFailedException
- && e.getCause() instanceof StatementException);
- if (isStatementException && getEntry(id).isPresent()) {
+ if (isStatementException(e) && getEntry(id).isPresent()) {
throw new EntryExistsException(id, e);
} else {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
+ @VisibleForTesting
+ protected static boolean isStatementException(Throwable e)
+ {
+ return e instanceof StatementException ||
+ (e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
+ }
+
@Override
public boolean setStatus(final String entryId, final boolean active, final StatusType status)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org