You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/23 08:09:34 UTC

svn commit: r1470823 - in /hive/trunk: hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/metadata/ ql/src/ja...

Author: hashutosh
Date: Tue Apr 23 06:09:33 2013
New Revision: 1470823

URL: http://svn.apache.org/r1470823
Log:
HIVE-2379 : Hive/HBase integration could be improved (Navis via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Tue Apr 23 06:09:33 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -268,6 +269,7 @@ public class HBaseStorageHandler extends
       configureTableJobProperties(tableDesc, jobProperties);
   }
 
+  @Override
   public void configureTableJobProperties(
     TableDesc tableDesc,
     Map<String, String> jobProperties) {
@@ -293,6 +295,17 @@ public class HBaseStorageHandler extends
   }
 
   @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    try {
+      TableMapReduceUtil.addDependencyJars(jobConf);
+      org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
+          HBaseStorageHandler.class);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public DecomposedPredicate decomposePredicate(
     JobConf jobConf,
     Deserializer deserializer,

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java Tue Apr 23 06:09:33 2013
@@ -22,8 +22,8 @@ package org.apache.hcatalog.mapreduce;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputFo
  * The abstract Class HCatStorageHandler would server as the base class for all
  * the storage handlers required for non-native tables in HCatalog.
  */
-public abstract class HCatStorageHandler implements HiveStorageHandler {
+public abstract class HCatStorageHandler extends DefaultStorageHandler {
 
     //TODO move this to HiveStorageHandler
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Tue Apr 23 06:09:33 2013
@@ -412,7 +412,7 @@ public class ExecDriver extends Task<Map
           LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri());
         }
       }
-
+      work.configureJobConf(job);
       addInputPaths(job, work, emptyScratchDirStr, ctx);
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1470823&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Tue Apr 23 06:09:33 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+public class OperatorUtils {
+
+  public static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz) {
+    return findOperator(start, clazz, new HashSet<T>());
+  }
+
+  public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) {
+    Set<T> found = new HashSet<T>();
+    for (Operator<?> start : starts) {
+      findOperator(start, clazz, found);
+    }
+    return found;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Set<T> findOperator(Operator<?> start, Class<T> clazz, Set<T> found) {
+    if (clazz.isInstance(start)) {
+      found.add((T) start);
+    }
+    if (start.getChildOperators() != null) {
+      for (Operator<?> child : start.getChildOperators()) {
+        findOperator(child, clazz, found);
+      }
+    }
+    return found;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Apr 23 06:09:33 2013
@@ -380,7 +380,7 @@ public final class Utilities {
 
   public static String getHiveJobID(Configuration job) {
     String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
-    if (planPath != null) {
+    if (planPath != null && !planPath.isEmpty()) {
       return (new Path(planPath)).getName();
     }
     return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java Tue Apr 23 06:09:33 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -87,6 +88,11 @@ public class DefaultStorageHandler imple
   }
 
   @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    //do nothing by default
+  }
+
+  @Override
   public Configuration getConf() {
     return conf;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java Tue Apr 23 06:09:33 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 
 /**
@@ -133,4 +134,12 @@ public interface HiveStorageHandler exte
   public void configureTableJobProperties(
     TableDesc tableDesc,
     Map<String, String> jobProperties);
+
+  /**
+   * Called just before submitting MapReduce job.
+   *
+   * @param tableDesc descriptor for the table being accessed
+   * @param JobConf jobConf for MapReduce job
+   */
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Tue Apr 23 06:09:33 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -29,13 +30,16 @@ import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol;
 import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * MapredWork.
@@ -557,4 +561,19 @@ public class MapredWork extends Abstract
   public void setFinalMapRed(boolean finalMapRed) {
     this.finalMapRed = finalMapRed;
   }
+
+  public void configureJobConf(JobConf jobConf) {
+    for (PartitionDesc partition : aliasToPartnInfo.values()) {
+      PlanUtils.configureJobConf(partition.getTableDesc(), jobConf);
+    }
+    Collection<Operator<?>> mappers = aliasToWork.values();
+    for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, FileSinkOperator.class)) {
+      PlanUtils.configureJobConf(fs.getConf().getTableInfo(), jobConf);
+    }
+    if (reducer != null) {
+      for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) {
+        PlanUtils.configureJobConf(fs.getConf().getTableInfo(), jobConf);
+      }
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1470823&r1=1470822&r2=1470823&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Tue Apr 23 06:09:33 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -302,8 +303,8 @@ public final class PlanUtils {
     return new TableDesc(MetadataTypedColumnsetSerDe.class,
         TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities
         .makeProperties(
-        org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT,
-        separatorCode));
+            org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT,
+            separatorCode));
   }
 
   /**
@@ -729,6 +730,19 @@ public final class PlanUtils {
     }
   }
 
+  public static void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    String handlerClass = tableDesc.getProperties().getProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+    try {
+      HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(jobConf, handlerClass);
+      if (storageHandler != null) {
+        storageHandler.configureJobConf(tableDesc, jobConf);
+      }
+    } catch (HiveException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public static String stripQuotes(String val) {
     if ((val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'')
         || (val.charAt(0) == '\"' && val.charAt(val.length() - 1) == '\"')) {