You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/07/13 19:57:52 UTC
hive git commit: HIVE-20147: Hive streaming ingest is contented on
synchronized logging (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master d8306cf68 -> 9c5c9409b
HIVE-20147: Hive streaming ingest is contented on synchronized logging (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9c5c9409
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9c5c9409
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9c5c9409
Branch: refs/heads/master
Commit: 9c5c9409b544fea240235d8902dca8853d63d834
Parents: d8306cf
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jul 13 12:57:04 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jul 13 12:57:04 2018 -0700
----------------------------------------------------------------------
.../hive/streaming/AbstractRecordWriter.java | 52 ++++++++++++++------
.../hive/streaming/HiveStreamingConnection.java | 4 +-
2 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9c5c9409/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 2980028..9e90d36 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -142,7 +142,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
try {
URI uri = new URI(location);
this.fs = FileSystem.newInstance(uri, conf);
- LOG.info("Created new filesystem instance: {}", System.identityHashCode(this.fs));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new filesystem instance: {}", System.identityHashCode(this.fs));
+ }
} catch (URISyntaxException e) {
throw new StreamingException("Unable to create URI from location: " + location, e);
} catch (IOException e) {
@@ -197,7 +199,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED);
this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD);
this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE);
- LOG.info("Memory monitorings settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}",
+ LOG.info("Memory monitoring settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}",
autoFlush, memoryUsageThreshold, ingestSizeBytes);
this.heapMemoryMonitor = new HeapMemoryMonitor(memoryUsageThreshold);
MemoryUsage tenuredMemUsage = heapMemoryMonitor.getTenuredGenMemoryUsage();
@@ -329,9 +331,13 @@ public abstract class AbstractRecordWriter implements RecordWriter {
@Override
public void flush() throws StreamingIOFailure {
try {
- logStats("Stats before flush:");
+ if (LOG.isDebugEnabled()) {
+ logStats("Stats before flush:");
+ }
for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
- LOG.info("Flushing record updater for partitions: {}", entry.getKey());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flushing record updater for partitions: {}", entry.getKey());
+ }
for (RecordUpdater updater : entry.getValue()) {
if (updater != null) {
updater.flush();
@@ -339,7 +345,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
ingestSizeBytes = 0;
- logStats("Stats after flush:");
+ if (LOG.isDebugEnabled()) {
+ logStats("Stats after flush:");
+ }
} catch (IOException e) {
throw new StreamingIOFailure("Unable to flush recordUpdater", e);
}
@@ -349,10 +357,14 @@ public abstract class AbstractRecordWriter implements RecordWriter {
public void close() throws StreamingIOFailure {
boolean haveError = false;
String partition = null;
- logStats("Stats before close:");
+ if (LOG.isDebugEnabled()) {
+ logStats("Stats before close:");
+ }
for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
partition = entry.getKey();
- LOG.info("Closing updater for partitions: {}", partition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing updater for partitions: {}", partition);
+ }
for (RecordUpdater updater : entry.getValue()) {
if (updater != null) {
try {
@@ -367,7 +379,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
entry.getValue().clear();
}
updaters.clear();
- logStats("Stats after close:");
+ if (LOG.isDebugEnabled()) {
+ logStats("Stats after close:");
+ }
if (haveError) {
throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition));
}
@@ -432,8 +446,10 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
if (lowMemoryCanary != null) {
if (lowMemoryCanary.get() && ingestSizeBytes > ingestSizeThreshold) {
- LOG.info("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " +
- "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " +
+ "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+ }
flush();
conn.getConnectionStats().incrementAutoFlushCount();
lowMemoryCanary.set(false);
@@ -444,8 +460,10 @@ public abstract class AbstractRecordWriter implements RecordWriter {
MemoryUsage heapUsage = mxBean.getHeapMemoryUsage();
float memUsedFraction = ((float) heapUsage.getUsed() / (float) heapUsage.getMax());
if (memUsedFraction > memoryUsageThreshold) {
- LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " +
- "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " +
+ "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+ }
flush();
conn.getConnectionStats().incrementAutoFlushCount();
}
@@ -498,9 +516,13 @@ public abstract class AbstractRecordWriter implements RecordWriter {
// partitions to TxnHandler
if (!partitionInfo.isExists()) {
addedPartitions.add(partitionInfo.getName());
- LOG.info("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+ }
} else {
- LOG.info("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+ }
}
destLocation = new Path(partitionInfo.getPartitionLocation());
}
@@ -550,7 +572,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" +
LlapUtil.humanReadableByteCount(memoryUsage.getMax());
}
- LOG.info("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " +
+ LOG.debug("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " +
"buffered-ingest-size: {}, total-ingest-size: {} tenured-memory-usage: {}]", prefix, openRecordUpdaters,
partitionPaths.size(), bufferedRecords, conn.getConnectionStats().getRecordsWritten(),
LlapUtil.humanReadableByteCount(ingestSizeBytes),
http://git-wip-us.apache.org/repos/asf/hive/blob/9c5c9409/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 7adbadd..6cf14b0 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -552,7 +552,9 @@ public class HiveStreamingConnection implements StreamingConnection {
getMSC().close();
getHeatbeatMSC().close();
}
- LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
+ }
}
@Override