You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/08/11 16:14:16 UTC

hive git commit: HIVE-14270: Write temporary data to HDFS when doing inserts on tables located on S3 (Sergio Pena, reviewed by Ashutosh Chauhan, Lefty Leverenz)

Repository: hive
Updated Branches:
  refs/heads/master 546fe8775 -> 1bdbdc4dd


HIVE-14270: Write temporary data to HDFS when doing inserts on tables located on S3 (Sergio Pena, reviewed by Ashutosh Chauhan, Lefty Leverenz)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1bdbdc4d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1bdbdc4d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1bdbdc4d

Branch: refs/heads/master
Commit: 1bdbdc4ddfc4e66ac06a99ce930a87cf52307440
Parents: 546fe87
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Jul 27 10:34:08 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Thu Aug 11 11:12:20 2016 -0500

----------------------------------------------------------------------
 .../hadoop/hive/common/BlobStorageUtils.java    | 54 +++++++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 11 ++-
 .../hive/common/TestBlobStorageUtils.java       | 95 ++++++++++++++++++++
 .../java/org/apache/hadoop/hive/ql/Context.java | 20 +++++
 .../hive/ql/optimizer/GenMapRedUtils.java       |  9 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  9 +-
 .../apache/hadoop/hive/ql/exec/TestContext.java | 63 +++++++++++++
 7 files changed, 253 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java
new file mode 100644
index 0000000..6ca35e2
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.Collection;
+
+/**
+ * Utilities for different blob (object) storage systems
+ */
+public class BlobStorageUtils {
+    private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false;
+
+    public static boolean isBlobStoragePath(final Configuration conf, final Path path) {
+        return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme());
+    }
+
+    public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) {
+        return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme());
+    }
+
+    public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) {
+        Collection<String> supportedBlobStoreSchemes =
+                conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
+
+        return supportedBlobStoreSchemes.contains(scheme);
+    }
+
+    public static boolean isBlobStorageAsScratchDir(final Configuration conf) {
+        return conf.getBoolean(
+                HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname,
+                DISABLE_BLOBSTORAGE_AS_SCRATCHDIR
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 70816bd..3e9f6ec 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3084,7 +3084,16 @@ public class HiveConf extends Configuration {
     HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
         new TimeValidator(TimeUnit.SECONDS),
         "Timeout for Running Query in seconds. A nonpositive value means infinite. " +
-        "If the query timeout is also set by thrift API call, the smaller one will be taken.");
+        "If the query timeout is also set by thrift API call, the smaller one will be taken."),
+
+    /* BLOBSTORE section */
+
+    HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
+            "Comma-separated list of supported blobstore schemes."),
+
+    HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false,
+            "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties).");
+
 
     public final String varname;
     private final String altName;

http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java
new file mode 100644
index 0000000..84a0d86
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.hive.common.BlobStorageUtils.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestBlobStorageUtils {
+  private static final Configuration conf = new Configuration();
+
+  @Before
+  public void setUp() {
+    conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift");
+    conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false);
+  }
+
+  @Test
+  public void testValidAndInvalidPaths() throws IOException {
+    // Valid paths
+    assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path")));
+    assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path")));
+
+    // Invalid paths
+    assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path")));
+    assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file")));
+    assertFalse(isBlobStoragePath(conf, null));
+    assertFalse(isBlobStorageFileSystem(conf, null));
+    assertFalse(isBlobStoragePath(conf, new Path(URI.create(""))));
+  }
+
+  @Test
+  public void testValidAndInvalidFileSystems() {
+    FileSystem fs = mock(FileSystem.class);
+
+    /* Valid FileSystem schemes */
+
+    doReturn("s3a").when(fs).getScheme();
+    assertTrue(isBlobStorageFileSystem(conf, fs));
+
+    doReturn("swift").when(fs).getScheme();
+    assertTrue(isBlobStorageFileSystem(conf, fs));
+
+    /* Invalid FileSystem schemes */
+
+    doReturn("hdfs").when(fs).getScheme();
+    assertFalse(isBlobStorageFileSystem(conf, fs));
+
+    doReturn("").when(fs).getScheme();
+    assertFalse(isBlobStorageFileSystem(conf, fs));
+
+    assertFalse(isBlobStorageFileSystem(conf, null));
+  }
+
+  @Test
+  public void testValidAndInvalidSchemes() {
+    // Valid schemes
+    assertTrue(isBlobStorageScheme(conf, "s3a"));
+    assertTrue(isBlobStorageScheme(conf, "swift"));
+
+    // Invalid schemes
+    assertFalse(isBlobStorageScheme(conf, "hdfs"));
+    assertFalse(isBlobStorageScheme(conf, ""));
+    assertFalse(isBlobStorageScheme(conf, null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 89893eb..3785b1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -351,6 +352,25 @@ public class Context {
     }
   }
 
+  /**
+   * Create a temporary directory depending of the path specified.
+   * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS)
+   * - If path is on HDFS, then create a staging directory inside the path
+   *
+   * @param path Path used to verify the Filesystem to use for temporary directory
+   * @return A path to the new temporary directory
+     */
+  public Path getTempDirForPath(Path path) {
+    if (BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) {
+      // For better write performance, we use HDFS for temporary data when object store is used.
+      // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system
+      // to take advantage of this performance.
+      return getMRTmpPath();
+    } else {
+      return getExtTmpPathRelTo(path);
+    }
+  }
+
   private Path getExternalScratchDir(URI extURI) {
     return getStagingDir(new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()), !explain);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5bd7886..cea99e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -36,6 +36,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -1796,15 +1797,17 @@ public final class GenMapRedUtils {
     Path dest = null;
 
     if (chDir) {
-      dest = fsOp.getConf().getFinalDirName();
+      FileSinkDesc fileSinkDesc = fsOp.getConf();
+      dest = fileSinkDesc.getFinalDirName();
 
       // generate the temporary file
       // it must be on the same file system as the current destination
       Context baseCtx = parseCtx.getContext();
 
-      Path tmpDir = baseCtx.getExternalTmpPath(dest);
+      // Create the required temporary file in the HDFS location if the destination
+      // path of the FileSinkOperator table is a blobstore path.
+      Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
 
-      FileSinkDesc fileSinkDesc = fsOp.getConf();
       // Change all the linked file sink descriptors
       if (fileSinkDesc.isLinkedFileSink()) {
         for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a01a7bd..6758741 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -6642,7 +6643,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if (isNonNativeTable) {
         queryTmpdir = dest_path;
       } else {
-        queryTmpdir = ctx.getExtTmpPathRelTo(dest_path);
+        queryTmpdir = ctx.getTempDirForPath(dest_path);
       }
       if (dpCtx != null) {
         // set the root of the temporary path where dynamic partition columns will populate
@@ -6759,7 +6760,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
           .getAuthority(), partPath.toUri().getPath());
 
-      queryTmpdir = ctx.getExternalTmpPath(dest_path);
+      queryTmpdir = ctx.getTempDirForPath(dest_path);
       table_desc = Utilities.getTableDesc(dest_tab);
 
       // Add sorting/bucketing if needed
@@ -6807,7 +6808,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
         try {
           Path qPath = FileUtils.makeQualified(dest_path, conf);
-          queryTmpdir = ctx.getExtTmpPathRelTo(qPath);
+          queryTmpdir = ctx.getTempDirForPath(qPath);
         } catch (Exception e) {
           throw new SemanticException("Error creating temporary folder on: "
               + dest_path, e);
@@ -7015,7 +7016,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
     if (!destTableIsMaterialization &&
             HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
-      String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString();
+      String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString();
       fileSinkDesc.setStatsTmpDir(statsTmpLoc);
       LOG.debug("Set stats collection dir : " + statsTmpLoc);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java
new file mode 100644
index 0000000..4a4c240
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class TestContext {
+    private static HiveConf conf = new HiveConf();
+
+    private Context context;
+
+    @Before
+    public void setUp() throws IOException {
+        /* Only called to create session directories used by the Context class */
+        SessionState.start(conf);
+        SessionState.detachSession();
+
+        context = new Context(conf);
+    }
+
+    @Test
+    public void testGetScratchDirectoriesForPaths() throws IOException {
+        Context spyContext = spy(context);
+
+        // When Object store paths are used, then getMRTmpPatch() is called to get a temporary
+        // directory on the default scratch diretory location (usually /temp)
+        Path mrTmpPath = new Path("hdfs://hostname/tmp/scratch");
+        doReturn(mrTmpPath).when(spyContext).getMRTmpPath();
+        assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("s3a://bucket/dir")));
+
+        // When Non-Object store paths are used, then getExtTmpPathRelTo is called to get a temporary
+        // directory on the same path passed as a parameter
+        Path tmpPathRelTo = new Path("hdfs://hostname/user");
+        doReturn(tmpPathRelTo).when(spyContext).getExtTmpPathRelTo(any(Path.class));
+        assertEquals(tmpPathRelTo, spyContext.getTempDirForPath(new Path("/user")));
+    }
+}