You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/02/08 05:43:02 UTC
[druid] branch master updated: Logging large segment list handling
(#9312)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 831ec17 Logging large segment list handling (#9312)
831ec17 is described below
commit 831ec172f15ae852f1cc9510337f7afc99b39460
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Feb 7 21:42:45 2020 -0800
Logging large segment list handling (#9312)
* better handling of large segment lists in logs
* more
* adjust
* exceptions
* fixes
* refactor
* debug
* heh
* dang
---
.../impl/AbstractTextFilesFirehoseFactory.java | 6 +-
.../druid/java/util/common/logger/Logger.java | 119 ++++++++++++++++++++-
.../org/apache/druid/segment/SegmentUtils.java | 8 +-
.../java/org/apache/druid/timeline/SegmentId.java | 2 +-
.../druid/java/util/common/logger/LoggerTest.java | 102 ++++++++++++++++++
.../druid/security/basic/BasicAuthUtils.java | 4 +-
.../druid/security/basic/CommonCacheNotifier.java | 2 +-
.../storage/google/GoogleDataSegmentKiller.java | 2 +-
.../druid/storage/s3/S3DataSegmentMover.java | 6 +-
.../indexer/hadoop/DatasourceIngestionSpec.java | 3 +-
.../druid/indexing/common/IndexTaskClient.java | 11 +-
.../common/actions/LocalTaskActionClient.java | 2 +-
.../common/actions/SegmentAllocateAction.java | 6 +-
.../actions/SegmentTransactionalInsertAction.java | 5 +-
.../task/AppenderatorDriverRealtimeIndexTask.java | 2 +-
.../druid/indexing/common/task/ArchiveTask.java | 2 +-
.../indexing/common/task/HadoopIndexTask.java | 6 +-
.../druid/indexing/common/task/IndexTask.java | 6 +-
.../indexing/common/task/InputSourceProcessor.java | 8 +-
.../druid/indexing/common/task/RestoreTask.java | 2 +-
.../parallel/PartialDimensionDistributionTask.java | 2 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 6 +-
.../firehose/IngestSegmentFirehoseFactory.java | 2 +-
.../druid/indexing/overlord/RemoteTaskRunner.java | 3 +-
.../SeekableStreamIndexTaskRunner.java | 14 ++-
.../druid/indexing/worker/WorkerTaskManager.java | 4 +-
.../org/apache/druid/client/BrokerServerView.java | 2 +-
.../druid/client/CachingClusteredClient.java | 2 +-
.../java/org/apache/druid/client/DruidServer.java | 6 +-
.../indexing/overlord/SegmentPublishResult.java | 3 +-
.../query/lookup/LookupReferencesManager.java | 2 +-
.../loading/SegmentLoaderLocalCacheManager.java | 2 +-
.../druid/segment/loading/StorageLocation.java | 5 +-
.../realtime/appenderator/AppenderatorImpl.java | 3 +-
.../appenderator/BaseAppenderatorDriver.java | 52 +++++----
.../appenderator/SegmentsAndCommitMetadata.java | 3 +-
.../segment/realtime/plumber/RealtimePlumber.java | 4 +-
.../server/coordinator/CuratorLoadQueuePeon.java | 4 +-
.../server/coordinator/duty/BalanceSegments.java | 4 +-
.../server/coordinator/duty/CompactSegments.java | 6 +-
.../duty/EmitClusterStatsAndMetrics.java | 12 ++-
.../duty/NewestSegmentFirstIterator.java | 22 ++--
.../druid/server/coordinator/rules/LoadRule.java | 2 +-
.../druid/server/router/QueryHostFinder.java | 6 +-
.../server/router/TieredBrokerHostSelector.java | 4 +-
.../PreResponseAuthorizationCheckFilter.java | 2 +-
.../server/security/SecuritySanityCheckFilter.java | 2 +-
.../StreamAppenderatorDriverFailTest.java | 2 +-
48 files changed, 380 insertions(+), 105 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java
index 4fe2e28..592d014 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java
@@ -78,11 +78,7 @@ public abstract class AbstractTextFilesFirehoseFactory<T>
return IOUtils.lineIterator(wrapObjectStream(object, openObjectStream(object)), StandardCharsets.UTF_8);
}
catch (Exception e) {
- LOG.error(
- e,
- "Exception reading object[%s]",
- object
- );
+ LOG.error(e, "Exception reading object[%s]", object);
throw new RuntimeException(e);
}
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java b/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
index 15e16d0..bd1cf9d 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
@@ -19,13 +19,24 @@
package org.apache.druid.java.util.common.logger;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.function.BiConsumer;
+import java.util.stream.Stream;
public class Logger
{
+ @VisibleForTesting
+ static final int SEGMENTS_PER_LOG_MESSAGE = 64;
+
private final org.slf4j.Logger log;
private final boolean stackTraces;
private final Logger noStackTraceLogger;
@@ -153,12 +164,45 @@ public class Logger
public void wtf(String message, Object... formatArgs)
{
- log.error(StringUtils.nonStrictFormat("WTF?!: " + message, formatArgs), new Exception());
+ error(message, formatArgs);
}
public void wtf(Throwable t, String message, Object... formatArgs)
{
- log.error(StringUtils.nonStrictFormat("WTF?!: " + message, formatArgs), t);
+ error(t, message, formatArgs);
+ }
+
+ public void debugSegments(@Nullable final Collection<DataSegment> segments, @Nullable String preamble)
+ {
+ if (log.isDebugEnabled()) {
+ logSegments(this::debug, segments, preamble);
+ }
+ }
+
+ public void infoSegments(@Nullable final Collection<DataSegment> segments, @Nullable String preamble)
+ {
+ if (log.isInfoEnabled()) {
+ logSegments(this::info, segments, preamble);
+ }
+ }
+
+ public void infoSegmentIds(@Nullable final Stream<SegmentId> segments, @Nullable String preamble)
+ {
+ if (log.isInfoEnabled()) {
+ logSegmentIds(this::info, segments, preamble);
+ }
+ }
+
+ public void warnSegments(@Nullable final Collection<DataSegment> segments, @Nullable String preamble)
+ {
+ if (log.isWarnEnabled()) {
+ logSegments(this::warn, segments, preamble);
+ }
+ }
+
+ public void errorSegments(@Nullable final Collection<DataSegment> segments, @Nullable String preamble)
+ {
+ logSegments(this::error, segments, preamble);
}
public boolean isTraceEnabled()
@@ -188,4 +232,75 @@ public class Logger
}
}
}
+
+ /**
+ * Logs all the segment ids you could ever want, {@link #SEGMENTS_PER_LOG_MESSAGE} at a time, as a comma separated
+ * list.
+ */
+ @VisibleForTesting
+ static void logSegments(
+ Logger.LogFunction logger,
+ @Nullable final Collection<DataSegment> segments,
+ @Nullable String preamble
+ )
+ {
+ if (segments == null || segments.isEmpty()) {
+ return;
+ }
+ logSegmentIds(logger, segments.stream().map(DataSegment::getId), preamble);
+ }
+
+ /**
+ * Logs all the segment ids you could ever want, {@link #SEGMENTS_PER_LOG_MESSAGE} at a time, as a comma separated
+ * list.
+ */
+ @VisibleForTesting
+ static void logSegmentIds(
+ Logger.LogFunction logger,
+ @Nullable final Stream<SegmentId> stream,
+ @Nullable String preamble
+ )
+ {
+ Preconditions.checkNotNull(preamble);
+ if (stream == null) {
+ return;
+ }
+ final Iterator<SegmentId> iterator = stream.iterator();
+ if (!iterator.hasNext()) {
+ return;
+ }
+ final String logFormat = preamble + ": %s";
+
+ int counter = 0;
+ StringBuilder sb = null;
+ while (iterator.hasNext()) {
+ SegmentId nextId = iterator.next();
+ if (counter == 0) {
+ // use segmentId string length of first as estimate for total size of builder for this batch
+ sb = new StringBuilder(SEGMENTS_PER_LOG_MESSAGE * (2 + nextId.safeUpperLimitOfStringSize())).append("[");
+ }
+ sb.append(nextId);
+ if (++counter < SEGMENTS_PER_LOG_MESSAGE && iterator.hasNext()) {
+ sb.append(", ");
+ }
+ counter = counter % SEGMENTS_PER_LOG_MESSAGE;
+ if (counter == 0) {
+ // flush
+ sb.append("]");
+ logger.log(logFormat, sb.toString());
+ }
+ }
+
+ // check for stragglers
+ if (counter > 0) {
+ sb.append("]");
+ logger.log(logFormat, sb.toString());
+ }
+ }
+
+ @FunctionalInterface
+ public interface LogFunction
+ {
+ void log(String msg, Object... format);
+ }
}
diff --git a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
index aae1643b..6f27348 100644
--- a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
+++ b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -82,13 +83,18 @@ public class SegmentUtils
* for log messages. Not useful for anything else, because this doesn't take special effort to escape commas that
* occur in identifiers (not common, but could potentially occur in a datasource name).
*/
- public static Object commaSeparatedIdentifiers(final Collection<DataSegment> segments)
+ @Nullable
+ public static Object commaSeparatedIdentifiers(@Nullable final Collection<DataSegment> segments)
{
+ if (segments == null || segments.isEmpty()) {
+ return null;
+ }
// Lazy, to avoid preliminary string creation if logging level is turned off
return Collections2.transform(segments, DataSegment::getId);
}
private SegmentUtils()
{
+ // no instantiation
}
}
diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentId.java b/core/src/main/java/org/apache/druid/timeline/SegmentId.java
index 37fd68d..8430524 100644
--- a/core/src/main/java/org/apache/druid/timeline/SegmentId.java
+++ b/core/src/main/java/org/apache/druid/timeline/SegmentId.java
@@ -409,7 +409,7 @@ public final class SegmentId implements Comparable<SegmentId>
return sb.toString();
}
- private int safeUpperLimitOfStringSize()
+ public int safeUpperLimitOfStringSize()
{
int delimiters = 4;
int partitionNumSizeUpperLimit = 3; // less than 1000 partitions
diff --git a/core/src/test/java/org/apache/druid/java/util/common/logger/LoggerTest.java b/core/src/test/java/org/apache/druid/java/util/common/logger/LoggerTest.java
index 086df91..acb549b 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/logger/LoggerTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/logger/LoggerTest.java
@@ -19,8 +19,23 @@
package org.apache.druid.java.util.common.logger;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
public class LoggerTest
{
private final Logger log = new Logger(LoggerTest.class);
@@ -50,4 +65,91 @@ public class LoggerTest
log.noStackTrace().error(new RuntimeException("beep"), "");
log.error(new RuntimeException("beep"), "An exception");
}
+
+ @Test
+ public void testLogNoSegments()
+ {
+ List<String> messages = new ArrayList<>();
+ Logger.LogFunction logger = getLogToListFunction(messages);
+ Logger.logSegments(logger, Collections.emptyList(), "None segments");
+ Logger.logSegmentIds(logger, Stream.empty(), "None segments");
+
+ Assert.assertEquals(0, messages.size());
+ }
+
+ @Test
+ public void testLogSegments()
+ {
+ List<String> messages = new ArrayList<>();
+ List<DataSegment> segments = makeDataSegments(2).collect(Collectors.toList());
+ Logger.LogFunction logger = getLogToListFunction(messages);
+ Logger.logSegments(logger, segments, "Test segments");
+
+ Assert.assertEquals(1, messages.size());
+ final String expected =
+ "Test segments: [someDataSource_2012-01-01T00:00:00.000Z_2012-01-03T00:00:00.000Z_2020-02-02T00:00:00.000Z,"
+ + " someDataSource_2012-01-02T00:00:00.000Z_2012-01-04T00:00:00.000Z_2020-02-02T00:00:00.000Z]";
+ Assert.assertEquals(expected, messages.get(0));
+ }
+
+
+ @Test
+ public void testLogSegmentIds()
+ {
+ List<String> messages = new ArrayList<>();
+ Stream<SegmentId> segments = makeDataSegments(2).map(DataSegment::getId);
+ Logger.LogFunction logger = getLogToListFunction(messages);
+ Logger.logSegmentIds(logger, segments, "Test segments");
+
+ Assert.assertEquals(1, messages.size());
+ final String expected =
+ "Test segments: [someDataSource_2012-01-01T00:00:00.000Z_2012-01-03T00:00:00.000Z_2020-02-02T00:00:00.000Z,"
+ + " someDataSource_2012-01-02T00:00:00.000Z_2012-01-04T00:00:00.000Z_2020-02-02T00:00:00.000Z]";
+ Assert.assertEquals(expected, messages.get(0));
+ }
+
+
+ @Test
+ public void testLogSegmentsMany()
+ {
+ final int numSegments = 100000;
+ final MutableInt msgCount = new MutableInt();
+ final Stream<SegmentId> segments = makeDataSegments(numSegments).map(DataSegment::getId);
+
+ final Logger.LogFunction logger = (msg, format) -> {
+ String message = StringUtils.format(msg, format);
+ Assert.assertTrue(message.startsWith("Many segments: ["));
+ Assert.assertTrue(message.endsWith("]"));
+ msgCount.increment();
+ };
+ Logger.logSegmentIds(logger, segments, "Many segments");
+
+ final int expected = (int) Math.ceil((double) numSegments / Logger.SEGMENTS_PER_LOG_MESSAGE);
+ Assert.assertEquals(expected, msgCount.getValue());
+ }
+
+ private Logger.LogFunction getLogToListFunction(List<String> messages)
+ {
+ return (msg, format) -> messages.add(StringUtils.format(msg, format));
+ }
+
+ private Stream<DataSegment> makeDataSegments(int numSegments)
+ {
+ final DateTime start = DateTimes.of("2012-01-01");
+ final DateTime end = DateTimes.of("2012-01-02");
+ final String version = DateTimes.of("2020-02-02").toString();
+ return IntStream.range(0, numSegments)
+ .mapToObj(segmentNum -> DataSegment.builder()
+ .dataSource("someDataSource")
+ .interval(
+ new Interval(
+ start.plusDays(segmentNum),
+ end.plusDays(segmentNum + 1)
+ )
+ )
+ .version(version)
+ .size(1)
+ .build());
+
+ }
}
diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicAuthUtils.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicAuthUtils.java
index 1646fc3..91926db 100644
--- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicAuthUtils.java
+++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicAuthUtils.java
@@ -112,8 +112,8 @@ public class BasicAuthUtils
return key.getEncoded();
}
catch (InvalidKeySpecException ikse) {
- log.error("WTF? invalid keyspec");
- throw new RuntimeException("WTF? invalid keyspec", ikse);
+ log.error("Invalid keyspec");
+ throw new RuntimeException("Invalid keyspec", ikse);
}
catch (NoSuchAlgorithmException nsae) {
log.error("%s not supported on this system.", ALGORITHM);
diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
index ad0364d..036c5bc 100644
--- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
+++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
@@ -198,7 +198,7 @@ public class CommonCacheNotifier
);
}
catch (MalformedURLException mue) {
- LOG.error(callerName + ":WTF? Malformed url for DruidNode[%s] and baseUrl[%s]", druidNode, baseUrl);
+ LOG.error(callerName + ": Malformed url for DruidNode[%s] and baseUrl[%s]", druidNode, baseUrl);
throw new RuntimeException(mue);
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java
index 27cb989..2f925ef 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentKiller.java
@@ -47,7 +47,7 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
- LOG.info("Killing segment [%s]", segment);
+ LOG.info("Killing segment [%s]", segment.getId());
Map<String, Object> loadSpec = segment.getLoadSpec();
final String bucket = MapUtils.getString(loadSpec, "bucket");
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java
index 0f4d298..6605e95 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java
@@ -209,8 +209,10 @@ public class S3DataSegmentMover implements DataSegmentMover
if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
log.info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
- s3Bucket, s3Path,
- targetS3Bucket, targetS3Path
+ s3Bucket,
+ s3Path,
+ targetS3Bucket,
+ targetS3Path
);
} else {
throw new SegmentLoadingException(
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceIngestionSpec.java
index 20a8eb3..382f21a 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceIngestionSpec.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceIngestionSpec.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
@@ -252,7 +253,7 @@ public class DatasourceIngestionSpec
return "DatasourceIngestionSpec{" +
"dataSource='" + dataSource + '\'' +
", intervals=" + intervals +
- ", segments=" + segments +
+ ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", filter=" + filter +
", dimensions=" + dimensions +
", metrics=" + metrics +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index e71585f..bb073f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -375,7 +375,9 @@ public abstract class IndexTaskClient implements AutoCloseable
if (headerId != null && !headerId.equals(taskId)) {
log.warn(
"Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s",
- taskId, headerId, TASK_MISMATCH_RETRY_DELAY_SECONDS
+ taskId,
+ headerId,
+ TASK_MISMATCH_RETRY_DELAY_SECONDS
);
delay = Duration.standardSeconds(TASK_MISMATCH_RETRY_DELAY_SECONDS);
} else {
@@ -413,8 +415,11 @@ public abstract class IndexTaskClient implements AutoCloseable
}
}
catch (NoTaskLocationException e) {
- log.info("No TaskLocation available for task [%s], this task may not have been assigned to a worker yet or "
- + "may have already completed", taskId);
+ log.info(
+ "No TaskLocation available for task [%s], this task may not have been assigned to a worker yet "
+ + "or may have already completed",
+ taskId
+ );
throw e;
}
catch (Exception e) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
index 587675e..8c7cb53 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
@@ -51,7 +51,7 @@ public class LocalTaskActionClient implements TaskActionClient
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
- log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
+ log.debug("Performing action for task[%s]: %s", task.getId(), taskAction);
if (auditLogConfig.isEnabled() && taskAction.isAudited()) {
// Add audit log
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
index aaf30ce..4436d71 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -268,7 +268,11 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
{
// Existing segment(s) exist for this row; use the interval of the first one.
if (!usedSegment.getInterval().contains(rowInterval)) {
- log.error("The interval of existing segment[%s] doesn't contain rowInterval[%s]", usedSegment, rowInterval);
+ log.error(
+ "The interval of existing segment[%s] doesn't contain rowInterval[%s]",
+ usedSegment.getId(),
+ rowInterval
+ );
return null;
} else {
// If segment allocation failed here, it is highly likely an unrecoverable error. We log here for easier
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index f98d9b7..aad16bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
@@ -304,8 +305,8 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
public String toString()
{
return "SegmentTransactionalInsertAction{" +
- "segmentsToBeOverwritten=" + segmentsToBeOverwritten +
- ", segments=" + segments +
+ "segmentsToBeOverwritten=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeOverwritten) +
+ ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
", dataSource=" + dataSource +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 9ad620b..1f9865d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -634,7 +634,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
if (spec.getTuningConfig().isLogParseExceptions()) {
- log.error(pe, "Encountered parse exception: ");
+ log.error(pe, "Encountered parse exception");
}
if (savedParseExceptions != null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
index 9779989..9f65099 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
@@ -89,7 +89,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
if (archivedSegment != null) {
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment)));
} else {
- log.info("No action was taken for [%s]", segment);
+ log.info("No action was taken for [%s]", segment.getId());
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 3537907..9d86ce8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -285,10 +285,10 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
if (e instanceof RuntimeException && e.getCause() instanceof InvocationTargetException) {
InvocationTargetException ite = (InvocationTargetException) e.getCause();
effectiveException = ite.getCause();
- log.error(effectiveException, "Got invocation target exception in run(), cause: ");
+ log.error(effectiveException, "Got invocation target exception in run()");
} else {
effectiveException = e;
- log.error(e, "Encountered exception in run():");
+ log.error(e, "Encountered exception in run()");
}
errorMsg = Throwables.getStackTraceAsString(effectiveException);
@@ -616,7 +616,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
);
}
catch (Exception e) {
- log.error(e, "Got exception from getTotalMetrics(): ");
+ log.error(e, "Got exception from getTotalMetrics()");
return null;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index e4bb8de..be24995 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -84,7 +84,6 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IngestionSpec;
@@ -811,7 +810,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
catch (ParseException e) {
if (ingestionSchema.getTuningConfig().isLogParseExceptions()) {
- log.error(e, "Encountered parse exception: ");
+ log.error(e, "Encountered parse exception");
}
if (determinePartitionsSavedParseExceptions != null) {
@@ -979,7 +978,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
buildSegmentsMeters.getUnparseable(),
buildSegmentsMeters.getThrownAway()
);
- log.info("Published segments: %s", SegmentUtils.commaSeparatedIdentifiers(published.getSegments()));
+ log.info("Published [%s] segments", published.getSegments().size());
+ log.debugSegments(published.getSegments(), "Published segments");
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.success(getId());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
index 424fcb4..e88dab2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
@@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
@@ -156,7 +155,7 @@ public class InputSourceProcessor
// If those segments are not pushed here, the remaining available space in appenderator will be kept
// small which could lead to smaller segments.
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
- LOG.debug("Pushed segments: %s", SegmentUtils.commaSeparatedIdentifiers(pushed.getSegments()));
+ LOG.debugSegments(pushed.getSegments(), "Pushed segments");
}
}
} else {
@@ -175,8 +174,7 @@ public class InputSourceProcessor
}
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
-
- LOG.debug("Pushed segments: %s", SegmentUtils.commaSeparatedIdentifiers(pushed.getSegments()));
+ LOG.debugSegments(pushed.getSegments(), "Pushed segments");
return pushed;
}
@@ -191,7 +189,7 @@ public class InputSourceProcessor
}
if (logParseExceptions) {
- LOG.error(e, "Encountered parse exception:");
+ LOG.error(e, "Encountered parse exception");
}
if (buildSegmentsSavedParseExceptions != null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
index 45ae99a..2978859 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
@@ -92,7 +92,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
if (restored != null) {
restoredSegments.add(restored);
} else {
- log.info("Segment [%s] did not move, not updating metadata", segment);
+ log.info("Segment [%s] did not move, not updating metadata", segment.getId());
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 60c2d18..99304c2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -287,7 +287,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
}
catch (ParseException e) {
if (isLogParseExceptions) {
- LOG.error(e, "Encountered parse exception:");
+ LOG.error(e, "Encountered parse exception");
}
numParseExceptions++;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 5b9d1f5..a380ac8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -396,7 +396,8 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
// which makes the size of segments smaller.
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
- LOG.info("Pushed segments[%s]", pushed.getSegments());
+ LOG.info("Pushed [%s] segments", pushed.getSegments().size());
+ LOG.infoSegments(pushed.getSegments(), "Pushed segments");
}
} else {
throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
@@ -415,7 +416,8 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
- LOG.info("Pushed segments[%s]", pushed.getSegments());
+ LOG.info("Pushed [%s] segments", pushed.getSegments().size());
+ LOG.infoSegments(pushed.getSegments(), "Pushed segments");
appenderator.close();
return pushedSegments;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 5ae5d2b..320b859 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -184,7 +184,7 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
@Override
public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException
{
- log.info(
+ log.debug(
"Connecting firehose: dataSource[%s], interval[%s], segmentIds[%s]",
dataSource,
interval,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 9352f65..fe6e8be 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -1302,7 +1302,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue() == null) {
log.error(
- "Huh? null work item for [%s]", entry.getKey()
+ "Huh? null work item for [%s]",
+ entry.getKey()
);
} else if (entry.getValue().getWorker() == null) {
log.error("Huh? no worker for [%s]", entry.getKey());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e37e1fb..9a18322 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -78,7 +78,6 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -1018,11 +1017,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
public void onSuccess(SegmentsAndCommitMetadata publishedSegmentsAndCommitMetadata)
{
log.info(
- "Published segments %s for sequence [%s] with metadata [%s].",
- SegmentUtils.commaSeparatedIdentifiers(publishedSegmentsAndCommitMetadata.getSegments()),
+ "Published %s segments for sequence [%s] with metadata [%s].",
+ publishedSegmentsAndCommitMetadata.getSegments().size(),
sequenceMetadata.getSequenceName(),
Preconditions.checkNotNull(publishedSegmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata")
);
+ log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(), "Published segments");
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());
@@ -1046,8 +1046,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
if (handoffSegmentsAndCommitMetadata == null) {
log.warn(
- "Failed to hand off segments: %s",
- SegmentUtils.commaSeparatedIdentifiers(publishedSegmentsAndCommitMetadata.getSegments())
+ "Failed to hand off %s segments",
+ publishedSegmentsAndCommitMetadata.getSegments().size()
+ );
+ log.warnSegments(
+ publishedSegmentsAndCommitMetadata.getSegments(),
+ "Failed to hand off segments"
);
}
handoffFuture.set(handoffSegmentsAndCommitMetadata);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index bd84c8c..6855d08 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -534,7 +534,7 @@ public abstract class WorkerTaskManager
}
}
catch (Exception ex) {
- log.info(ex, "Exception while getting active tasks from overlord. will retry on next scheduled run.");
+ log.warn(ex, "Exception while getting active tasks from overlord. will retry on next scheduled run.");
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -569,7 +569,7 @@ public abstract class WorkerTaskManager
}
}
catch (Throwable th) {
- log.error(th, "WTF! Got unknown exception while running the scheduled cleanup.");
+ log.error(th, "Got unknown exception while running the scheduled cleanup.");
}
},
1,
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 5bf4ee9..337e9a4 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -314,7 +314,7 @@ public class BrokerServerView implements TimelineServerView
synchronized (lock) {
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
- log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
+ log.error("No QueryableDruidServer found for %s", server.getName());
return null;
}
return queryableDruidServer.getQueryRunner();
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index ba8b3ec..5254591 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -603,7 +603,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
final QueryRunner serverRunner = serverView.getQueryRunner(server);
if (serverRunner == null) {
- log.error("Server[%s] doesn't have a query runner", server);
+ log.error("Server[%s] doesn't have a query runner", server.getName());
return;
}
diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java
index d3bcb0d..ddcba54 100644
--- a/server/src/main/java/org/apache/druid/client/DruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/DruidServer.java
@@ -202,7 +202,11 @@ public class DruidServer implements Comparable<DruidServer>
currSize.addAndGet(segment.getSize());
totalSegments.incrementAndGet();
} else {
- log.warn("Asked to add data segment that already exists!? server[%s], segment[%s]", getName(), segment);
+ log.warn(
+ "Asked to add data segment that already exists!? server[%s], segment[%s]",
+ getName(),
+ segment.getId()
+ );
}
return dataSource;
}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
index a088c93..620ff88 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@@ -116,7 +117,7 @@ public class SegmentPublishResult
public String toString()
{
return "SegmentPublishResult{" +
- "segments=" + segments +
+ "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", success=" + success +
", errorMsg='" + errorMsg + '\'' +
'}';
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
index 1c39522..ccc5d0b 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java
@@ -161,7 +161,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
() -> {
try {
if (!lifecycleLock.awaitStarted()) {
- LOG.error("WTF! lifecycle not started, lookup update notices will not be handled.");
+ LOG.error("Lifecycle not started, lookup update notices will not be handled.");
return;
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index a62577c..398ad67 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -263,7 +263,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
- log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
+ log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId());
return;
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index 40e10ec..7fa1f43 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -149,7 +149,10 @@ public class StorageLocation
if (availableSizeBytes() < segmentSize) {
log.warn(
"Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param",
- segmentId, segmentSize, getPath(), availableSizeBytes()
+ segmentId,
+ segmentSize,
+ getPath(),
+ availableSizeBytes()
);
return false;
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 7fe1aff..fe6986d 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -1315,7 +1315,8 @@ public class AppenderatorImpl implements Appenderator
if (indexToPersist.hasSwapped()) {
log.info(
"Segment[%s] hydrant[%s] already swapped. Ignoring request to persist.",
- identifier, indexToPersist
+ identifier,
+ indexToPersist
);
return 0;
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index c0c8184..548de50 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.realtime.appenderator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@@ -41,7 +40,6 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment;
@@ -470,7 +468,11 @@ public abstract class BaseAppenderatorDriver implements Closeable
final boolean useUniquePath
)
{
- log.info("Pushing segments in background: [%s]", Joiner.on(", ").join(segmentIdentifiers));
+ log.info("Pushing [%s] segments in background", segmentIdentifiers.size());
+ log.infoSegmentIds(
+ segmentIdentifiers.stream().map(SegmentIdWithShardSpec::asSegmentId),
+ "Pushing segments"
+ );
return Futures.transform(
appenderator.push(segmentIdentifiers, wrappedCommitter, useUniquePath),
@@ -484,14 +486,18 @@ public abstract class BaseAppenderatorDriver implements Closeable
if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) {
log.warn(
- "Removing segments from deep storage because sanity check failed: %s",
- SegmentUtils.commaSeparatedIdentifiers(segmentsAndMetadata.getSegments())
+ "Removing [%s] segments from deep storage because sanity check failed",
+ segmentsAndMetadata.getSegments().size()
+ );
+ log.warnSegments(
+ segmentsAndMetadata.getSegments(),
+ "Removing segments due to failed sanity check"
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
throw new ISE(
- "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].",
+ "Pushed different segments than requested. Pushed[%s], requested[%s].",
pushedSegments,
segmentIdentifiers
);
@@ -513,7 +519,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
*/
ListenableFuture<SegmentsAndCommitMetadata> dropInBackground(SegmentsAndCommitMetadata segmentsAndCommitMetadata)
{
- log.debug("Dropping segments: %s", SegmentUtils.commaSeparatedIdentifiers(segmentsAndCommitMetadata.getSegments()));
+ log.debugSegments(segmentsAndCommitMetadata.getSegments(), "Dropping segments");
final ListenableFuture<?> dropFuture = Futures.allAsList(
segmentsAndCommitMetadata
@@ -586,10 +592,11 @@ public abstract class BaseAppenderatorDriver implements Closeable
if (publishResult.isSuccess()) {
log.info(
- "Published segments with commit metadata [%s]: %s",
- callerMetadata,
- SegmentUtils.commaSeparatedIdentifiers(segmentsAndCommitMetadata.getSegments())
+ "Published [%s] segments with commit metadata [%s]",
+ segmentsAndCommitMetadata.getSegments().size(),
+ callerMetadata
);
+ log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments");
} else {
// Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active
// now after all, for two possible reasons:
@@ -610,8 +617,12 @@ public abstract class BaseAppenderatorDriver implements Closeable
if (activeSegments.equals(ourSegments)) {
log.info(
- "Could not publish segments, but checked and found them already published; continuing: %s",
- SegmentUtils.commaSeparatedIdentifiers(ourSegments)
+ "Could not publish [%s] segments, but checked and found them already published; continuing.",
+ ourSegments.size()
+ );
+ log.infoSegments(
+ segmentsAndCommitMetadata.getSegments(),
+ "Could not publish segments"
);
// Clean up pushed segments if they are physically disjoint from the published ones (this means
@@ -629,23 +640,24 @@ public abstract class BaseAppenderatorDriver implements Closeable
segmentsAndCommitMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
if (publishResult.getErrorMsg() != null) {
+ log.errorSegments(ourSegments, "Failed to publish segments");
throw new ISE(
- "Failed to publish segments because of [%s]: %s",
- publishResult.getErrorMsg(),
- SegmentUtils.commaSeparatedIdentifiers(ourSegments)
+ "Failed to publish segments because of [%s]",
+ publishResult.getErrorMsg()
);
} else {
- throw new ISE("Failed to publish segments: %s", SegmentUtils.commaSeparatedIdentifiers(ourSegments));
+ log.errorSegments(ourSegments, "Failed to publish segments");
+ throw new ISE("Failed to publish segments");
}
}
}
}
catch (Exception e) {
// Must not remove segments here, we aren't sure if our transaction succeeded or not.
- log.noStackTrace().warn(
- e,
- "Failed publish, not removing segments: %s",
- SegmentUtils.commaSeparatedIdentifiers(segmentsAndCommitMetadata.getSegments())
+ log.noStackTrace().warn(e, "Failed publish");
+ log.warnSegments(
+ segmentsAndCommitMetadata.getSegments(),
+ "Failed publish, not removing segments"
);
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
index 1518e3c..3d30e01 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@@ -79,7 +80,7 @@ public class SegmentsAndCommitMetadata
{
return getClass().getSimpleName() + "{" +
"commitMetadata=" + commitMetadata +
- ", segments=" + segments +
+ ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
'}';
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index bd41e69..7a7db66 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -686,7 +686,9 @@ public class RealtimePlumber implements Plumber
if (timestamp > latestCommitTime) {
log.info(
"Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]",
- queryableIndex.getMetadata(), timestamp, latestCommitTime
+ queryableIndex.getMetadata(),
+ timestamp,
+ latestCommitTime
);
latestCommitTime = timestamp;
metadata = queryableIndex.getMetadata().get(COMMIT_METADATA_KEY);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index 5fd9ac5..9ca70d4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -342,7 +342,9 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
- basePath, path, segmentHolder
+ basePath,
+ path,
+ segmentHolder
);
return;
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index 70c3813..d42ca63 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -233,7 +233,9 @@ public class BalanceSegments implements CoordinatorDuty
log.info(
"Unable to select %d remaining candidate segments out of %d total to balance "
+ "after %d iterations, ending run.",
- (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+ (maxSegmentsToMove - moved - unmoved),
+ maxSegmentsToMove,
+ iter
);
break;
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 9b63ae7..7b8c9b5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -30,7 +30,6 @@ import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -210,10 +209,11 @@ public class CompactSegments implements CoordinatorDuty
newAutoCompactionContext(config.getTaskContext())
);
LOG.info(
- "Submitted a compactionTask[%s] for segments %s",
+ "Submitted a compactionTask[%s] for %s segments",
taskId,
- SegmentUtils.commaSeparatedIdentifiers(segmentsToCompact)
+ segmentsToCompact.size()
);
+ LOG.infoSegments(segmentsToCompact, "Compacting segments");
// Count the compaction task itself + its sub tasks
numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1;
} else {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 09164e2..658017a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -109,7 +109,9 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
(final String tier, final long count) -> {
log.info(
"[%s] : Assigned %s segments among %,d servers",
- tier, count, cluster.getHistoricalsByTier(tier).size()
+ tier,
+ count,
+ cluster.getHistoricalsByTier(tier).size()
);
emitTieredStat(emitter, "segment/assigned/count", tier, count);
@@ -121,7 +123,9 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
(final String tier, final long count) -> {
log.info(
"[%s] : Dropped %s segments among %,d servers",
- tier, count, cluster.getHistoricalsByTier(tier).size()
+ tier,
+ count,
+ cluster.getHistoricalsByTier(tier).size()
);
emitTieredStat(emitter, "segment/dropped/count", tier, count);
@@ -148,7 +152,9 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
(final String tier, final long count) -> {
log.info(
"[%s] : Removed %s unneeded segments among %,d servers",
- tier, count, cluster.getHistoricalsByTier(tier).size()
+ tier,
+ count,
+ cluster.getHistoricalsByTier(tier).size()
);
emitTieredStat(emitter, "segment/unneeded/count", tier, count);
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 8525308..b4b4f23 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -257,7 +258,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
- log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0));
+ log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0).getId());
return true;
}
@@ -267,7 +268,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.allMatch(segment -> lastCompactionState.equals(segment.getLastCompactionState()));
if (!allCandidatesHaveSameLastCompactionState) {
- log.info("Candidates[%s] were compacted with different partitions spec. Needs compaction.", candidates.segments);
+ log.info(
+ "[%s] Candidate segments were compacted with different partitions spec. Needs compaction.",
+ candidates.segments.size()
+ );
+ log.debugSegments(
+ candidates.segments,
+ "Candidate segments compacted with different partiton spec"
+ );
+
return true;
}
@@ -275,7 +284,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (!(segmentPartitionsSpec instanceof DynamicPartitionsSpec)) {
log.info(
"Candidate segment[%s] was compacted with a non dynamic partitions spec. Needs compaction.",
- candidates.segments.get(0)
+ candidates.segments.get(0).getId()
);
return true;
}
@@ -535,11 +544,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return segments.isEmpty();
}
- private int getNumSegments()
- {
- return segments.size();
- }
-
private long getTotalSize()
{
return totalSize;
@@ -549,7 +553,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
public String toString()
{
return "SegmentsToCompact{" +
- "segments=" + segments +
+ "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", totalSize=" + totalSize +
'}';
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 2a43d95..8091254 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -395,7 +395,7 @@ public abstract class LoadRule implements Rule
left = dropSegmentFromServers(balancerStrategy, segment, activeServers, left);
}
if (left != 0) {
- log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getId());
+ log.warn("I have no servers serving [%s]?", segment.getId());
}
return numToDrop - left;
}
diff --git a/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java b/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
index 8853ffc..0494f92 100644
--- a/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
+++ b/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
@@ -125,7 +125,7 @@ public class QueryHostFinder
private Server findServerInner(final Pair<String, Server> selected)
{
if (selected == null) {
- log.error("Danger, Will Robinson! Unable to find any brokers!");
+ log.error("Unable to find any brokers!");
}
final String serviceName = selected == null ? hostSelector.getDefaultServiceName() : selected.lhs;
@@ -133,7 +133,7 @@ public class QueryHostFinder
if (server == null) {
log.error(
- "WTF?! No server found for serviceName[%s]. Using backup",
+ "No server found for serviceName[%s]. Using backup",
serviceName
);
@@ -141,7 +141,7 @@ public class QueryHostFinder
if (server == null) {
log.error(
- "WTF?! No backup found for serviceName[%s]. Using default[%s]",
+ "No backup found for serviceName[%s]. Using default[%s]",
serviceName,
hostSelector.getDefaultServiceName()
);
diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
index 08ce78f..a15aa5b 100644
--- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
+++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
@@ -235,7 +235,7 @@ public class TieredBrokerHostSelector<T>
if (brokerServiceName == null) {
log.error(
- "WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].",
+ "No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].",
query.getDataSource(),
query.getIntervals(),
tierConfig.getDefaultBrokerServiceName()
@@ -247,7 +247,7 @@ public class TieredBrokerHostSelector<T>
if (nodesHolder == null) {
log.error(
- "WTF?! No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]",
+ "No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]",
brokerServiceName,
tierConfig.getDefaultBrokerServiceName()
);
diff --git a/server/src/main/java/org/apache/druid/server/security/PreResponseAuthorizationCheckFilter.java b/server/src/main/java/org/apache/druid/server/security/PreResponseAuthorizationCheckFilter.java
index 9b72b19..8bd7d9f 100644
--- a/server/src/main/java/org/apache/druid/server/security/PreResponseAuthorizationCheckFilter.java
+++ b/server/src/main/java/org/apache/druid/server/security/PreResponseAuthorizationCheckFilter.java
@@ -178,7 +178,7 @@ public class PreResponseAuthorizationCheckFilter implements Filter
outputStream.write(errorJson.getBytes(StandardCharsets.UTF_8));
}
catch (IOException ioe) {
- log.error("WTF? Can't get writer from HTTP response.");
+ log.error("Can't get writer from HTTP response.");
}
}
}
diff --git a/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java b/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java
index f222e5a..2e23898 100644
--- a/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java
+++ b/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java
@@ -101,7 +101,7 @@ public class SecuritySanityCheckFilter implements Filter
outputStream.write(errorJson.getBytes(StandardCharsets.UTF_8));
}
catch (IOException ioe) {
- log.error("WTF? Can't get writer from HTTP response.");
+ log.error("Can't get writer from HTTP response.");
}
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 2f1c92f..16d3c90 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,7 +239,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
- expectedException.expectMessage("Failed to publish segments because of [test]:");
+ expectedException.expectMessage("Failed to publish segments because of [test]");
testFailDuringPublishInternal(false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org