You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/11/11 07:46:23 UTC

svn commit: r1033824 - in /hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/

Author: namit
Date: Thu Nov 11 06:46:22 2010
New Revision: 1033824

URL: http://svn.apache.org/viewvc?rev=1033824&view=rev
Log:
HIVE-1758 Reduce memory for groupby map-side hashmap
(Siying Dong via namit)


Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
Modified:
    hive/trunk/CHANGES.txt
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java

Modified: hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hive/trunk/CHANGES.txt?rev=1033824&r1=1033823&r2=1033824&view=diff
==============================================================================
--- hive/trunk/CHANGES.txt (original)
+++ hive/trunk/CHANGES.txt Thu Nov 11 06:46:22 2010
@@ -235,6 +235,9 @@ Trunk -  Unreleased
     HIVE-1743 Compare from the end for GroupBy
     (Siying Dong via namit)
 
+    HIVE-1758 Reduce memory for groupby map-side hashmap
+    (Siying Dong via namit)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1033824&r1=1033823&r2=1033824&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Nov 11 06:46:22 2010
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
 import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -73,7 +72,6 @@ public class GroupByOperator extends Ope
 
   protected transient ExprNodeEvaluator[] keyFields;
   protected transient ObjectInspector[] keyObjectInspectors;
-  protected transient Object[] keyObjects;
 
   protected transient ExprNodeEvaluator[][] aggregationParameterFields;
   protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
@@ -99,10 +97,11 @@ public class GroupByOperator extends Ope
   protected transient ArrayList<ObjectInspector> objectInspectors;
   transient ArrayList<String> fieldNames;
 
+  transient KeyWrapperFactory keyWrapperFactory;
   // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
   // MERGEPARTIAL
-  protected transient ArrayList<Object> currentKeys;
-  protected transient ArrayList<Object> newKeys;
+  protected transient KeyWrapper currentKeys;
+  protected transient KeyWrapper newKeys;
   protected transient AggregationBuffer[] aggregations;
   protected transient Object[][] aggregationsParametersLastInvoke;
 
@@ -110,7 +109,7 @@ public class GroupByOperator extends Ope
   protected transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
 
   // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
-  protected transient HashSet<ArrayList<Object>> keysCurrentGroup;
+  protected transient HashSet<KeyWrapper> keysCurrentGroup;
 
   transient boolean bucketGroup;
 
@@ -134,8 +133,6 @@ public class GroupByOperator extends Ope
   // new Key ObjectInspectors are objectInspectors from the parent
   transient StructObjectInspector newKeyObjectInspector;
   transient StructObjectInspector currentKeyObjectInspector;
-  transient ListObjectsEqualComparer currentStructEqualComparer;
-  transient ListObjectsEqualComparer newKeyStructEqualComparer;
 
   /**
    * This is used to store the position and field names for variable length
@@ -192,16 +189,13 @@ public class GroupByOperator extends Ope
     keyFields = new ExprNodeEvaluator[conf.getKeys().size()];
     keyObjectInspectors = new ObjectInspector[conf.getKeys().size()];
     currentKeyObjectInspectors = new ObjectInspector[conf.getKeys().size()];
-    keyObjects = new Object[conf.getKeys().size()];
     for (int i = 0; i < keyFields.length; i++) {
       keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
       keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
       currentKeyObjectInspectors[i] = ObjectInspectorUtils
           .getStandardObjectInspector(keyObjectInspectors[i],
           ObjectInspectorCopyOption.WRITABLE);
-      keyObjects[i] = null;
     }
-    newKeys = new ArrayList<Object>(keyFields.length);
 
     // initialize unionExpr for reduce-side
     // reduce KEY has union field as the last field if there are distinct
@@ -344,7 +338,7 @@ public class GroupByOperator extends Ope
           HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
       groupKeyIsNotReduceKey = conf.getGroupKeyNotReductionKey();
       if (groupKeyIsNotReduceKey) {
-        keysCurrentGroup = new HashSet<ArrayList<Object>>();
+        keysCurrentGroup = new HashSet<KeyWrapper>();
       }
     }
 
@@ -365,12 +359,14 @@ public class GroupByOperator extends Ope
     currentKeyObjectInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(keyNames, Arrays
         .asList(currentKeyObjectInspectors));
-    currentStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, currentKeyObjectInspectors);
-    newKeyStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, keyObjectInspectors);
 
     outputObjInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(fieldNames, objectInspectors);
 
+    keyWrapperFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors);
+
+    newKeys = keyWrapperFactory.getKeyWrapper();
+
     firstRow = true;
     // estimate the number of hash table entries based on the size of each
     // entry. Since the size of a entry
@@ -713,17 +709,9 @@ public class GroupByOperator extends Ope
     try {
       countAfterReport++;
 
-      // Compute the keys
-      newKeys.clear();
-      for (int i = 0; i < keyFields.length; i++) {
-        if (keyObjectInspectors[i] == null) {
-          keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
-        }
-        keyObjects[i] = keyFields[i].evaluate(row);
-        newKeys.add(keyObjects[i]);
-      }
-
+      newKeys.getNewKey(row, rowInspector);
       if (hashAggr) {
+        newKeys.setHashKey();
         processHashAggr(row, rowInspector, newKeys);
       } else {
         processAggr(row, rowInspector, newKeys);
@@ -743,84 +731,16 @@ public class GroupByOperator extends Ope
     }
   }
 
-  private static ArrayList<Object> deepCopyElements(Object[] keys,
-      ObjectInspector[] keyObjectInspectors,
-      ObjectInspectorCopyOption copyOption) {
-    ArrayList<Object> result = new ArrayList<Object>(keys.length);
-    deepCopyElements(keys, keyObjectInspectors, result, copyOption);
-    return result;
-  }
-
-  private static void deepCopyElements(Object[] keys,
-      ObjectInspector[] keyObjectInspectors, ArrayList<Object> result,
-      ObjectInspectorCopyOption copyOption) {
-    result.clear();
-    for (int i = 0; i < keys.length; i++) {
-      result.add(ObjectInspectorUtils.copyToStandardObject(keys[i],
-          keyObjectInspectors[i], copyOption));
-    }
-  }
-
-  class KeyWrapper {
-    int hashcode;
-    ArrayList<Object> keys;
-    // decide whether this is already in hashmap (keys in hashmap are deepcopied
-    // version, and we need to use 'currentKeyObjectInspector').
-    boolean copy = false;
-
-    KeyWrapper() {
-    }
-
-    public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys) {
-      this(hashcode, copiedKeys, false);
-    }
-
-    public KeyWrapper(int hashcode, ArrayList<Object> copiedKeys,
-        boolean inHashMap) {
-      super();
-      this.hashcode = hashcode;
-      keys = copiedKeys;
-      copy = inHashMap;
-    }
-
-    @Override
-    public int hashCode() {
-      return hashcode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      ArrayList<Object> copied_in_hashmap = ((KeyWrapper) obj).keys;
-      if (!copy) {
-        return newKeyStructEqualComparer.areEqual(copied_in_hashmap, keys);
-      } else {
-        return currentStructEqualComparer.areEqual(copied_in_hashmap, keys);
-      }
-    }
-  }
-
-
-
-  KeyWrapper keyProber = new KeyWrapper();
-
   private void processHashAggr(Object row, ObjectInspector rowInspector,
-      ArrayList<Object> newKeys) throws HiveException {
+      KeyWrapper newKeys) throws HiveException {
     // Prepare aggs for updating
     AggregationBuffer[] aggs = null;
     boolean newEntryForHashAggr = false;
 
-    keyProber.hashcode = newKeys.hashCode();
-    // use this to probe the hashmap
-    keyProber.keys = newKeys;
-
     // hash-based aggregations
-    aggs = hashAggregations.get(keyProber);
-    ArrayList<Object> newDefaultKeys = null;
+    aggs = hashAggregations.get(newKeys);
     if (aggs == null) {
-      newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors,
-          ObjectInspectorCopyOption.WRITABLE);
-      KeyWrapper newKeyProber = new KeyWrapper(keyProber.hashcode,
-          newDefaultKeys, true);
+      KeyWrapper newKeyProber = newKeys.copyKey();
       aggs = newAggregations();
       hashAggregations.put(newKeyProber, aggs);
       newEntryForHashAggr = true;
@@ -833,11 +753,7 @@ public class GroupByOperator extends Ope
     // Peek into the set to find out if a new grouping key is seen for the given
     // reduction key
     if (groupKeyIsNotReduceKey) {
-      if (newDefaultKeys == null) {
-        newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors,
-            ObjectInspectorCopyOption.WRITABLE);
-      }
-      newEntryForHashAggr = keysCurrentGroup.add(newDefaultKeys);
+      newEntryForHashAggr = keysCurrentGroup.add(newKeys.copyKey());
     }
 
     // Update the aggs
@@ -859,27 +775,30 @@ public class GroupByOperator extends Ope
 
   // Non-hash aggregation
   private void processAggr(Object row, ObjectInspector rowInspector,
-      ArrayList<Object> newKeys) throws HiveException {
+      KeyWrapper newKeys) throws HiveException {
     // Prepare aggs for updating
     AggregationBuffer[] aggs = null;
     Object[][] lastInvoke = null;
+    //boolean keysAreEqual = (currentKeys != null && newKeys != null)?
+    //  newKeyStructEqualComparer.areEqual(currentKeys, newKeys) : false;
+
     boolean keysAreEqual = (currentKeys != null && newKeys != null)?
-      newKeyStructEqualComparer.areEqual(currentKeys, newKeys) : false;
+        newKeys.equals(currentKeys) : false;
 
 
     // Forward the current keys if needed for sort-based aggregation
     if (currentKeys != null && !keysAreEqual) {
-      forward(currentKeys, aggregations);
+      forward(currentKeys.getKeyArray(), aggregations);
       countAfterReport = 0;
     }
 
     // Need to update the keys?
     if (currentKeys == null || !keysAreEqual) {
       if (currentKeys == null) {
-        currentKeys = new ArrayList<Object>(keyFields.length);
+        currentKeys = newKeys.copyKey();
+      } else {
+        currentKeys.copyKey(newKeys);
       }
-      deepCopyElements(keyObjects, keyObjectInspectors, currentKeys,
-          ObjectInspectorCopyOption.WRITABLE);
 
       // Reset the aggregations
       resetAggregations(aggregations);
@@ -904,14 +823,14 @@ public class GroupByOperator extends Ope
    * @param newKeys
    *          keys for the row under consideration
    **/
-  private boolean shouldBeFlushed(ArrayList<Object> newKeys) {
+  private boolean shouldBeFlushed(KeyWrapper newKeys) {
     int numEntries = hashAggregations.size();
 
     // The fixed size for the aggregation class is already known. Get the
     // variable portion of the size every NUMROWSESTIMATESIZE rows.
     if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
       for (Integer pos : keyPositionsSize) {
-        Object key = newKeys.get(pos.intValue());
+        Object key = newKeys.getKeyArray()[pos.intValue()];
         // Ignore nulls
         if (key != null) {
           if (key instanceof LazyPrimitive) {
@@ -928,8 +847,7 @@ public class GroupByOperator extends Ope
 
       AggregationBuffer[] aggs = null;
       if (aggrPositions.size() > 0) {
-	KeyWrapper newKeyProber = new KeyWrapper(
-	    newKeys.hashCode(), newKeys);
+        KeyWrapper newKeyProber = newKeys.copyKey();
         aggs = hashAggregations.get(newKeyProber);
       }
 
@@ -975,7 +893,7 @@ public class GroupByOperator extends Ope
           .entrySet().iterator();
       while (iter.hasNext()) {
         Map.Entry<KeyWrapper, AggregationBuffer[]> m = iter.next();
-        forward(m.getKey().keys, m.getValue());
+        forward(m.getKey().getKeyArray(), m.getValue());
       }
       hashAggregations.clear();
       hashAggregations = null;
@@ -990,7 +908,7 @@ public class GroupByOperator extends Ope
     int numDel = 0;
     while (iter.hasNext()) {
       Map.Entry<KeyWrapper, AggregationBuffer[]> m = iter.next();
-      forward(m.getKey().keys, m.getValue());
+      forward(m.getKey().getKeyArray(), m.getValue());
       iter.remove();
       numDel++;
       if (numDel * 10 >= oldSize) {
@@ -1009,19 +927,20 @@ public class GroupByOperator extends Ope
    *          The keys in the record
    * @throws HiveException
    */
-  protected void forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
+  protected void forward(Object[] keys, AggregationBuffer[] aggs)
       throws HiveException {
-    int totalFields = keys.size() + aggs.length;
+    int totalFields = keys.length+ aggs.length;
     if (forwardCache == null) {
       forwardCache = new Object[totalFields];
     }
-    for (int i = 0; i < keys.size(); i++) {
-      forwardCache[i] = keys.get(i);
+    for (int i = 0; i < keys.length; i++) {
+      forwardCache[i] = keys[i];
     }
     for (int i = 0; i < aggs.length; i++) {
-      forwardCache[keys.size() + i] = aggregationEvaluators[i]
+      forwardCache[keys.length + i] = aggregationEvaluators[i]
           .evaluate(aggs[i]);
     }
+
     forward(forwardCache, outputObjInspector);
   }
 
@@ -1058,7 +977,7 @@ public class GroupByOperator extends Ope
           }
 
           // create dummy keys - size 0
-          forward(new ArrayList<Object>(0), aggregations);
+          forward(new Object[0], aggregations);
         } else {
           if (hashAggregations != null) {
             LOG.warn("Begin Hash Table flush at close: size = "
@@ -1067,14 +986,15 @@ public class GroupByOperator extends Ope
             while (iter.hasNext()) {
               Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry) iter
                   .next();
-              forward(m.getKey().keys, m.getValue());
+
+              forward(m.getKey().getKeyArray(), m.getValue());
               iter.remove();
             }
             hashAggregations.clear();
           } else if (aggregations != null) {
             // sort-based aggregations
             if (currentKeys != null) {
-              forward(currentKeys, aggregations);
+              forward(currentKeys.getKeyArray(), aggregations);
             }
             currentKeys = null;
           } else {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java?rev=1033824&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java Thu Nov 11 06:46:22 2010
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+public abstract class KeyWrapper {
+  abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
+  abstract void setHashKey();
+  abstract KeyWrapper copyKey();
+  abstract void copyKey(KeyWrapper oldWrapper);
+  abstract Object[] getKeyArray();
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java?rev=1033824&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java Thu Nov 11 06:46:22 2010
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+
+public class KeyWrapperFactory {
+  public KeyWrapperFactory(ExprNodeEvaluator[] keyFields, ObjectInspector[] keyObjectInspectors,
+      ObjectInspector[] currentKeyObjectInspectors) {
+    this.keyFields = keyFields;
+    this.keyObjectInspectors = keyObjectInspectors;
+    this.currentKeyObjectInspectors = currentKeyObjectInspectors;
+
+  }
+
+  public KeyWrapper getKeyWrapper() {
+    if (keyFields.length == 1
+        && TypeInfoUtils.getTypeInfoFromObjectInspector(keyObjectInspectors[0]).equals(
+            TypeInfoFactory.stringTypeInfo)) {
+      assert(TypeInfoUtils.getTypeInfoFromObjectInspector(currentKeyObjectInspectors[0]).equals(
+            TypeInfoFactory.stringTypeInfo));
+      soi_new = (StringObjectInspector) keyObjectInspectors[0];
+      soi_copy = (StringObjectInspector) currentKeyObjectInspectors[0];
+      return new TextKeyWrapper(false);
+    } else {
+      currentStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, currentKeyObjectInspectors);
+      newKeyStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, keyObjectInspectors);
+      return new ListKeyWrapper(false);
+    }
+  }
+
+  transient ExprNodeEvaluator[] keyFields;
+  transient ObjectInspector[] keyObjectInspectors;
+  transient ObjectInspector[] currentKeyObjectInspectors;
+
+
+  transient ListObjectsEqualComparer currentStructEqualComparer;
+  transient ListObjectsEqualComparer newKeyStructEqualComparer;
+
+  class ListKeyWrapper extends KeyWrapper {
+    int hashcode;
+    Object[] keys;
+    // decide whether this is already in hashmap (keys in hashmap are deepcopied
+    // version, and we need to use 'currentKeyObjectInspector').
+    ListObjectsEqualComparer equalComparer;
+
+    public ListKeyWrapper(boolean isCopy) {
+      this(-1, new Object[keyFields.length], isCopy);
+    }
+
+    private ListKeyWrapper(int hashcode, Object[] copiedKeys,
+        boolean isCopy) {
+      super();
+      this.hashcode = hashcode;
+      keys = copiedKeys;
+      setEqualComparer(isCopy);
+    }
+
+    private void setEqualComparer(boolean copy) {
+      if (!copy) {
+        equalComparer = newKeyStructEqualComparer;
+      } else {
+        equalComparer = currentStructEqualComparer;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return hashcode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      Object[] copied_in_hashmap = ((ListKeyWrapper) obj).keys;
+      return equalComparer.areEqual(copied_in_hashmap, keys);
+    }
+
+    @Override
+    public void setHashKey() {
+      hashcode = Arrays.hashCode(keys);
+    }
+
+    @Override
+    public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException {
+      // Compute the keys
+      for (int i = 0; i < keyFields.length; i++) {
+        keys[i]  = keyFields[i].evaluate(row);
+      }
+    }
+
+    @Override
+    public KeyWrapper copyKey() {
+      Object[] newDefaultKeys = deepCopyElements(keys, keyObjectInspectors,
+          ObjectInspectorCopyOption.WRITABLE);
+      return new ListKeyWrapper(hashcode, newDefaultKeys, true);
+    }
+
+    @Override
+    public void copyKey(KeyWrapper oldWrapper) {
+      ListKeyWrapper listWrapper = (ListKeyWrapper) oldWrapper;
+      hashcode = listWrapper.hashcode;
+      equalComparer = currentStructEqualComparer;
+      deepCopyElements(listWrapper.keys, keyObjectInspectors, keys,
+          ObjectInspectorCopyOption.WRITABLE);
+    }
+
+    @Override
+    public Object[] getKeyArray() {
+      return keys;
+    }
+
+    private Object[] deepCopyElements(Object[] keys,
+        ObjectInspector[] keyObjectInspectors,
+        ObjectInspectorCopyOption copyOption) {
+      Object[] result = new Object[keys.length];
+      deepCopyElements(keys, keyObjectInspectors, result, copyOption);
+      return result;
+    }
+
+    private void deepCopyElements(Object[] keys,
+        ObjectInspector[] keyObjectInspectors, Object[] result,
+        ObjectInspectorCopyOption copyOption) {
+      for (int i = 0; i < keys.length; i++) {
+        result[i] = ObjectInspectorUtils.copyToStandardObject(keys[i],
+            keyObjectInspectors[i], copyOption);
+      }
+    }
+  }
+
+  transient Object[] singleEleArray = new Object[1];
+  transient StringObjectInspector soi_new, soi_copy;
+
+  class TextKeyWrapper extends KeyWrapper {
+    int hashcode;
+    Object key;
+    boolean isCopy;
+
+    public TextKeyWrapper(boolean isCopy) {
+      this(-1, null, isCopy);
+    }
+
+    private TextKeyWrapper(int hashcode, Object key,
+        boolean isCopy) {
+      super();
+      this.hashcode = hashcode;
+      this.key = key;
+      this.isCopy = isCopy;
+    }
+
+    @Override
+    public int hashCode() {
+      return hashcode;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      Object obj = ((TextKeyWrapper) other).key;
+      Text t1;
+      Text t2;
+      if (isCopy) {
+        t1 =  soi_copy.getPrimitiveWritableObject(key);
+        t2 =  soi_copy.getPrimitiveWritableObject(obj);
+      } else {
+        t1 = soi_new.getPrimitiveWritableObject(key);
+        t2 = soi_copy.getPrimitiveWritableObject(obj);
+      }
+      if (t1 == null && t2 == null) {
+        return true;
+      } else if (t1 == null || t2 == null) {
+        return false;
+      } else {
+        return t1.equals(t2);
+      }
+    }
+
+    @Override
+    public void setHashKey() {
+      if (key == null) {
+        hashcode = 0;
+      } else{
+        hashcode = key.hashCode();
+      }
+    }
+
+    @Override
+    public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException {
+      // Compute the keys
+      key = keyFields[0].evaluate(row);
+    }
+
+    @Override
+    public KeyWrapper copyKey() {
+      return new TextKeyWrapper(hashcode, ObjectInspectorUtils.copyToStandardObject(key,
+          soi_new, ObjectInspectorCopyOption.WRITABLE), true);
+    }
+
+    @Override
+    public void copyKey(KeyWrapper oldWrapper) {
+      TextKeyWrapper textWrapper = (TextKeyWrapper) oldWrapper;
+      hashcode = textWrapper.hashcode;
+      isCopy = true;
+      key = ObjectInspectorUtils.copyToStandardObject(textWrapper.key,
+          soi_new, ObjectInspectorCopyOption.WRITABLE);
+    }
+
+    @Override
+    public Object[] getKeyArray() {
+      singleEleArray[0] = key;
+      return singleEleArray;
+    }
+  }
+}

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java?rev=1033824&r1=1033823&r2=1033824&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java Thu Nov 11 06:46:22 2010
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.serde2.objectinspector;
 
-import java.util.ArrayList;
-
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
@@ -155,15 +153,15 @@ public class ListObjectsEqualComparer {
    * @param ol1
    * @return True if object in ol0 and ol1 are all identical
    */
-  public boolean areEqual(ArrayList<Object> ol0, ArrayList<Object> ol1) {
-    if (ol0.size() != numFields || ol1.size() != numFields) {
-      if (ol0.size() != ol1.size()) {
+  public boolean areEqual(Object[] ol0, Object[] ol1) {
+    if (ol0.length != numFields || ol1.length != numFields) {
+      if (ol0.length != ol1.length) {
         return false;
       }
-      assert (ol0.size() <= numFields);
-      assert (ol1.size() <= numFields);
-      for (int i = 0; i < Math.min(ol0.size(), ol1.size()); i++) {
-        if (!fieldComparers[i].areEqual(ol0.get(i), ol1.get(i))) {
+      assert (ol0.length <= numFields);
+      assert (ol1.length <= numFields);
+      for (int i = 0; i < Math.min(ol0.length, ol1.length); i++) {
+        if (!fieldComparers[i].areEqual(ol0[i], ol1[i])) {
           return false;
         }
       }
@@ -171,7 +169,7 @@ public class ListObjectsEqualComparer {
     }
 
     for (int i = numFields - 1; i >= 0; i--) {
-      if (!fieldComparers[i].areEqual(ol0.get(i), ol1.get(i))) {
+      if (!fieldComparers[i].areEqual(ol0[i], ol1[i])) {
         return false;
       }
     }