You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/26 22:42:44 UTC

svn commit: r1517690 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec: MapJoinOperator.java ObjectCache.java ObjectCacheFactory.java mr/ExecMapper.java mr/ExecReducer.java mr/ObjectCache.java tez/DagUtils.java tez/ObjectCache.java

Author: gunther
Date: Mon Aug 26 20:42:43 2013
New Revision: 1517690

URL: http://svn.apache.org/r1517690
Log:
HIVE-5151: Going green: Container re-cycling in Tez (Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1517690&r1=1517689&r2=1517690&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon Aug 26 20:42:43 2013
@@ -52,6 +52,8 @@ public class MapJoinOperator extends Abs
   private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
 
 
+  private transient String hashMapKey;
+  private transient ObjectCache cache;
   protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
 
   protected static MapJoinMetaData metadata = new MapJoinMetaData();
@@ -82,6 +84,9 @@ public class MapJoinOperator extends Abs
 
     super.initializeOp(hconf);
 
+    hashMapKey = "__HASH_MAP_"+this.getOperatorId();
+    cache = ObjectCacheFactory.getCache(hconf);
+
     metadataValueTag = new int[numAliases];
     for (int pos = 0; pos < numAliases; pos++) {
       metadataValueTag[pos] = -1;
@@ -91,22 +96,29 @@ public class MapJoinOperator extends Abs
 
     int tagLen = conf.getTagLength();
 
-    mapJoinTables = new HashMapWrapper[tagLen];
     rowContainerMap = new MapJoinRowContainer[tagLen];
-    // initialize the hash tables for other tables
-    for (int pos = 0; pos < numAliases; pos++) {
-      if (pos == posBigTable) {
-        continue;
-      }
 
-      HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
+    mapJoinTables = (HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[]) cache.retrieve(hashMapKey);
+    hashTblInitedOnce = true;
 
-      mapJoinTables[pos] = hashTable;
-      MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
-      rowContainerMap[pos] = rowContainer;
-    }
+    if (mapJoinTables == null) {
+      mapJoinTables = new HashMapWrapper[tagLen];
 
-    hashTblInitedOnce = false;
+      // initialize the hash tables for other tables
+      for (int pos = 0; pos < numAliases; pos++) {
+        if (pos == posBigTable) {
+          continue;
+        }
+
+        HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
+
+        mapJoinTables[pos] = hashTable;
+        MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
+        rowContainerMap[pos] = rowContainer;
+      }
+
+      hashTblInitedOnce = false;
+    }
   }
 
   @Override
@@ -197,6 +209,7 @@ public class MapJoinOperator extends Abs
       LOG.error("Load Distributed Cache Error", e);
       throw new HiveException(e);
     }
+    cache.cache(hashMapKey, mapJoinTables);
   }
 
   // Load the hash table

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java?rev=1517690&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java Mon Aug 26 20:42:43 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+/**
+ * ObjectCache. Interface for maintaining objects associated with a task.
+ */
+public interface ObjectCache {
+  /**
+   * Add an object to the cache
+   * @param key
+   * @param value
+   */
+  public void cache(String key, Object value);
+
+  /**
+   * Retrieve object from cache.
+   * @param key
+   * @return the last cached object with the key, null if none.
+   */
+  public Object retrieve(String key);
+}

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java?rev=1517690&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java Mon Aug 26 20:42:43 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * ObjectCacheFactory returns the appropriate cache depending on settings in
+ * the hive conf.
+ */
+public class ObjectCacheFactory {
+
+  private ObjectCacheFactory() {
+    // avoid instantiation
+  }
+
+  /**
+   * Returns the appropriate cache
+   */
+  public static ObjectCache getCache(Configuration conf) {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) {
+      return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
+    } else {
+      return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+    }
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1517690&r1=1517689&r2=1517690&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Aug 26 20:42:43 2013
@@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -57,6 +59,7 @@ import org.apache.hadoop.util.StringUtil
  */
 public class ExecMapper extends MapReduceBase implements Mapper {
 
+  private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
   private Map<String, FetchOperator> fetchOperators;
   private OutputCollector oc;
@@ -92,11 +95,20 @@ public class ExecMapper extends MapReduc
     } catch (Exception e) {
       l4j.info("cannot get classpath: " + e.getMessage());
     }
+
+    setDone(false);
+
+    ObjectCache cache = ObjectCacheFactory.getCache(job);
+
     try {
       jc = job;
       execContext.setJc(jc);
       // create map and fetch operators
-      MapWork mrwork = Utilities.getMapWork(job);
+      MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY);
+      if (mrwork == null) {
+        mrwork = Utilities.getMapWork(job);
+        cache.cache(PLAN_KEY, mrwork);
+      }
       mo = new MapOperator();
       mo.setConf(mrwork);
       // initialize map operator

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1517690&r1=1517689&r2=1517690&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Mon Aug 26 20:42:43 2013
@@ -30,6 +30,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
@@ -64,6 +66,8 @@ import org.apache.hadoop.util.StringUtil
  */
 public class ExecReducer extends MapReduceBase implements Reducer {
 
+  private static final String PLAN_KEY = "__REDUCE_PLAN__";
+
   private JobConf jc;
   private OutputCollector<?, ?> oc;
   private Operator<?> reducer;
@@ -112,7 +116,14 @@ public class ExecReducer extends MapRedu
       l4j.info("cannot get classpath: " + e.getMessage());
     }
     jc = job;
-    ReduceWork gWork = Utilities.getReduceWork(job);
+
+    ObjectCache cache = ObjectCacheFactory.getCache(jc);
+    ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY);
+    if (gWork == null) {
+      gWork = Utilities.getReduceWork(job);
+      cache.cache(PLAN_KEY, gWork);
+    }
+
     reducer = gWork.getReducer();
     reducer.setParentOperators(null); // clear out any parents as reducer is the
     // root

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java?rev=1517690&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java Mon Aug 26 20:42:43 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * ObjectCache. No-op implementation on MR we don't have a means to reuse
+ * Objects between runs of the same task.
+ *
+ */
+public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
+
+  private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+
+  @Override
+  public void cache(String key, Object value) {
+    LOG.info("Ignoring cache key: "+key);
+  }
+
+  @Override
+  public Object retrieve(String key) {
+    LOG.info("Ignoring retrieval request: "+key);
+    return null;
+  }
+
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1517690&r1=1517689&r2=1517690&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Aug 26 20:42:43 2013
@@ -578,7 +578,7 @@ public class DagUtils {
    * createTezDir creates a temporary directory in the scratchDir folder to
    * be used with Tez. Assumes scratchDir exists.
    */
-  public static Path createTezDir(Path scratchDir, Configuration conf) 
+  public static Path createTezDir(Path scratchDir, Configuration conf)
       throws IOException {
     Path tezDir = getTezDir(scratchDir);
     FileSystem fs = tezDir.getFileSystem(conf);

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1517690&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Mon Aug 26 20:42:43 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
+import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
+
+
+/**
+ * ObjectCache. Tez implementation based on the tez object registry.
+ *
+ */
+public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
+
+  private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+  private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+
+  @Override
+  public void cache(String key, Object value) {
+    LOG.info("Adding " + key + " to cache");
+    registry.add(ObjectLifeCycle.VERTEX, key, value);
+  }
+
+  @Override
+  public Object retrieve(String key) {
+    Object o = registry.get(key);
+    LOG.info("Found " + key + " in cache with value: " + o);
+    return o;
+  }
+}