You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/07/13 19:57:52 UTC

hive git commit: HIVE-20147: Hive streaming ingest is contented on synchronized logging (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master d8306cf68 -> 9c5c9409b


HIVE-20147: Hive streaming ingest is contented on synchronized logging (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9c5c9409
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9c5c9409
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9c5c9409

Branch: refs/heads/master
Commit: 9c5c9409b544fea240235d8902dca8853d63d834
Parents: d8306cf
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jul 13 12:57:04 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jul 13 12:57:04 2018 -0700

----------------------------------------------------------------------
 .../hive/streaming/AbstractRecordWriter.java    | 52 ++++++++++++++------
 .../hive/streaming/HiveStreamingConnection.java |  4 +-
 2 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9c5c9409/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 2980028..9e90d36 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -142,7 +142,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     try {
       URI uri = new URI(location);
       this.fs = FileSystem.newInstance(uri, conf);
-      LOG.info("Created new filesystem instance: {}", System.identityHashCode(this.fs));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Created new filesystem instance: {}", System.identityHashCode(this.fs));
+      }
     } catch (URISyntaxException e) {
       throw new StreamingException("Unable to create URI from location: " + location, e);
     } catch (IOException e) {
@@ -197,7 +199,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED);
     this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD);
     this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE);
-    LOG.info("Memory monitorings settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}",
+    LOG.info("Memory monitoring settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}",
       autoFlush, memoryUsageThreshold, ingestSizeBytes);
     this.heapMemoryMonitor = new HeapMemoryMonitor(memoryUsageThreshold);
     MemoryUsage tenuredMemUsage = heapMemoryMonitor.getTenuredGenMemoryUsage();
@@ -329,9 +331,13 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   @Override
   public void flush() throws StreamingIOFailure {
     try {
-      logStats("Stats before flush:");
+      if (LOG.isDebugEnabled()) {
+        logStats("Stats before flush:");
+      }
       for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
-        LOG.info("Flushing record updater for partitions: {}", entry.getKey());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Flushing record updater for partitions: {}", entry.getKey());
+        }
         for (RecordUpdater updater : entry.getValue()) {
           if (updater != null) {
             updater.flush();
@@ -339,7 +345,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
         }
       }
       ingestSizeBytes = 0;
-      logStats("Stats after flush:");
+      if (LOG.isDebugEnabled()) {
+        logStats("Stats after flush:");
+      }
     } catch (IOException e) {
       throw new StreamingIOFailure("Unable to flush recordUpdater", e);
     }
@@ -349,10 +357,14 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   public void close() throws StreamingIOFailure {
     boolean haveError = false;
     String partition = null;
-    logStats("Stats before close:");
+    if (LOG.isDebugEnabled()) {
+      logStats("Stats before close:");
+    }
     for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
       partition = entry.getKey();
-      LOG.info("Closing updater for partitions: {}", partition);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closing updater for partitions: {}", partition);
+      }
       for (RecordUpdater updater : entry.getValue()) {
         if (updater != null) {
           try {
@@ -367,7 +379,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
       entry.getValue().clear();
     }
     updaters.clear();
-    logStats("Stats after close:");
+    if (LOG.isDebugEnabled()) {
+      logStats("Stats after close:");
+    }
     if (haveError) {
       throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition));
     }
@@ -432,8 +446,10 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
     if (lowMemoryCanary != null) {
       if (lowMemoryCanary.get() && ingestSizeBytes > ingestSizeThreshold) {
-        LOG.info("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " +
-          "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " +
+            "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+        }
         flush();
         conn.getConnectionStats().incrementAutoFlushCount();
         lowMemoryCanary.set(false);
@@ -444,8 +460,10 @@ public abstract class AbstractRecordWriter implements RecordWriter {
         MemoryUsage heapUsage = mxBean.getHeapMemoryUsage();
         float memUsedFraction = ((float) heapUsage.getUsed() / (float) heapUsage.getMax());
         if (memUsedFraction > memoryUsageThreshold) {
-          LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " +
-            "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+          if (LOG.isDebugEnabled()) {
+            LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " +
+              "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+          }
           flush();
           conn.getConnectionStats().incrementAutoFlushCount();
         }
@@ -498,9 +516,13 @@ public abstract class AbstractRecordWriter implements RecordWriter {
           // partitions to TxnHandler
           if (!partitionInfo.isExists()) {
             addedPartitions.add(partitionInfo.getName());
-            LOG.info("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Created partition {} for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+            }
           } else {
-            LOG.info("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Partition {} already exists for table {}", partitionInfo.getName(), fullyQualifiedTableName);
+            }
           }
           destLocation = new Path(partitionInfo.getPartitionLocation());
         }
@@ -550,7 +572,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
       oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" +
         LlapUtil.humanReadableByteCount(memoryUsage.getMax());
     }
-    LOG.info("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " +
+    LOG.debug("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " +
         "buffered-ingest-size: {}, total-ingest-size: {} tenured-memory-usage: {}]", prefix, openRecordUpdaters,
       partitionPaths.size(), bufferedRecords, conn.getConnectionStats().getRecordsWritten(),
       LlapUtil.humanReadableByteCount(ingestSizeBytes),

http://git-wip-us.apache.org/repos/asf/hive/blob/9c5c9409/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 7adbadd..6cf14b0 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -552,7 +552,9 @@ public class HiveStreamingConnection implements StreamingConnection {
       getMSC().close();
       getHeatbeatMSC().close();
     }
-    LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
+    }
   }
 
   @Override