You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/06/05 22:33:23 UTC

hive git commit: HIVE-19772: Streaming ingest V2 API can generate invalid orc file if interrupted (Prasanth Jayachandran reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master afc5fa49a -> c166202ac


HIVE-19772: Streaming ingest V2 API can generate invalid orc file if interrupted (Prasanth Jayachandran reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c166202a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c166202a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c166202a

Branch: refs/heads/master
Commit: c166202ac04dfcfb5e0aa6648f1c79f5ff8d11f2
Parents: afc5fa4
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue Jun 5 15:32:56 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue Jun 5 15:32:56 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 29 +++++++++++++-
 .../hive/streaming/AbstractRecordWriter.java    | 15 +++++++
 .../hive/streaming/HiveStreamingConnection.java | 42 ++++++++++++--------
 3 files changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c166202a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index e187ce1..1359dc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -493,6 +493,9 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void close(boolean abort) throws IOException {
     if (abort) {
       if (flushLengths == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Close on abort for path: {}.. Deleting..", path);
+        }
         fs.delete(path, false);
       }
     } else if (!writerClosed) {
@@ -500,25 +503,47 @@ public class OrcRecordUpdater implements RecordUpdater {
         // When split-update is enabled, we can choose not to write
         // any delta files when there are no inserts. In such cases only the delete_deltas
         // would be written & they are closed separately below.
-        if (writer != null && indexBuilder.acidStats.inserts > 0) {
-          writer.close(); // normal close, when there are inserts.
+        if (indexBuilder.acidStats.inserts > 0) {
+          if (writer != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Closing writer for path: {} acid stats: {}", path, indexBuilder.acidStats);
+            }
+            writer.close(); // normal close, when there are inserts.
+          }
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No insert events in path: {}.. Deleting..", path);
+          }
+          fs.delete(path, false);
         }
       } else {
         //so that we create empty bucket files when needed (but see HIVE-17138)
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initializing writer before close (to create empty buckets) for path: {}", path);
+        }
         initWriter();
         writer.close(); // normal close.
       }
       if (deleteEventWriter != null) {
         if (deleteEventIndexBuilder.acidStats.deletes > 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing delete event writer for path: {} acid stats: {}", path, indexBuilder.acidStats);
+          }
           // Only need to write out & close the delete_delta if there have been any.
           deleteEventWriter.close();
         } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No delete events in path: {}.. Deleting..", path);
+          }
           // Just remove delete_delta, if there have been no delete events.
           fs.delete(deleteEventPath, false);
         }
       }
     }
     if (flushLengths != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closing and deleting flush length file for path: {}", path);
+      }
       flushLengths.close();
       fs.delete(OrcAcidUtils.getSideFile(path), false);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/c166202a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 0866850..281f280 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -24,6 +24,8 @@ import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HeapMemoryMonitor;
 import org.apache.hadoop.hive.common.JavaUtils;
@@ -99,6 +102,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   protected boolean autoFlush;
   protected float memoryUsageThreshold;
   protected long ingestSizeThreshold;
+  protected FileSystem fs;
 
   public AbstractRecordWriter(final String lineDelimiter) {
     this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ?
@@ -134,6 +138,16 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     this.conf = conn.getHiveConf();
     this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
     this.table = conn.getTable();
+    String location = table.getSd().getLocation();
+    try {
+      URI uri = new URI(location);
+      this.fs = FileSystem.newInstance(uri, conf);
+      LOG.info("Created new filesystem instance: {}", System.identityHashCode(this.fs));
+    } catch (URISyntaxException e) {
+      throw new StreamingException("Unable to create URI from location: " + location, e);
+    } catch (IOException e) {
+      throw new StreamingException("Unable to get filesystem for location: " + location, e);
+    }
     this.inputColumns = table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList());
     this.inputTypes = table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList());
     if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
@@ -454,6 +468,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     tblProperties.putAll(table.getParameters());
     return acidOutputFormat.getRecordUpdater(partitionPath,
       new AcidOutputFormat.Options(conf)
+        .filesystem(fs)
         .inspector(outputRowObjectInspector)
         .bucket(bucketId)
         .tableProperties(tblProperties)

http://git-wip-us.apache.org/repos/asf/hive/blob/c166202a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f697211..0f9260d 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,6 +200,7 @@ public class HiveStreamingConnection implements StreamingConnection {
     // isolated from the other transaction related RPC calls.
     this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat");
     validateTable();
+
     LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
   }
 
@@ -326,7 +329,12 @@ public class HiveStreamingConnection implements StreamingConnection {
       if (recordWriter == null) {
         throw new StreamingException("Record writer cannot be null for streaming connection");
       }
-      return new HiveStreamingConnection(this);
+      HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this);
+      // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before
+      // filesystem close (to avoid ClosedChannelException)
+      ShutdownHookManager.addShutdownHook(streamingConnection::close,  FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+      Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close());
+      return streamingConnection;
     }
   }
 
@@ -539,7 +547,7 @@ public class HiveStreamingConnection implements StreamingConnection {
         currentTransactionBatch.close();
       }
     } catch (StreamingException e) {
-      LOG.error("Unable to close current transaction batch: " + currentTransactionBatch, e);
+      LOG.warn("Unable to close current transaction batch: " + currentTransactionBatch, e);
     } finally {
       getMSC().close();
       getHeatbeatMSC().close();
@@ -818,21 +826,11 @@ public class HiveStreamingConnection implements StreamingConnection {
      * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
      * This ensures that a client can't ignore these failures and continue to write.
      */
-    private void markDead(boolean success) {
+    private void markDead(boolean success) throws StreamingException {
       if (success) {
         return;
       }
-      isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
-      try {
-        abort(true);//abort all remaining txns
-      } catch (Exception ex) {
-        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
-      }
-      try {
-        closeImpl();
-      } catch (Exception ex) {
-        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
-      }
+      close();
     }
 
 
@@ -957,9 +955,19 @@ public class HiveStreamingConnection implements StreamingConnection {
       if (isTxnClosed.get()) {
         return;
       }
-      isTxnClosed.set(true);
-      abortImpl(true);
-      closeImpl();
+      isTxnClosed.set(true); //also ensures that heartbeat() is no-op since client is likely doing it async
+      try {
+        abort(true);//abort all remaining txns
+      } catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+        throw new StreamingException("Unable to abort", ex);
+      }
+      try {
+        closeImpl();
+      } catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+        throw new StreamingException("Unable to close", ex);
+      }
     }
 
     private void closeImpl() throws StreamingException {