You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 17:52:43 UTC

svn commit: r1514760 [2/2] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mapjoin/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/exec/persistence/ test/org/apache/hadoop/hive/q...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Constructor;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinTableContainerSerDe {
+  
+  private final MapJoinObjectSerDeContext keyContext;
+  private final MapJoinObjectSerDeContext valueContext;
+  public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext,
+      MapJoinObjectSerDeContext valueContext) {
+    this.keyContext = keyContext;
+    this.valueContext = valueContext;
+  }
+  @SuppressWarnings({"unchecked"})
+  public MapJoinTableContainer load(ObjectInputStream in) 
+      throws HiveException {
+    SerDe keySerDe = keyContext.getSerDe();
+    SerDe valueSerDe = valueContext.getSerDe();
+    MapJoinTableContainer tableContainer;
+    try {
+      String name = in.readUTF();
+      Map<String, String> metaData = (Map<String, String>) in.readObject();
+      tableContainer = create(name, metaData);      
+    } catch (IOException e) {
+      throw new HiveException("IO error while trying to create table container", e);
+    } catch (ClassNotFoundException e) {
+      throw new HiveException("Class Initialization error while trying to create table container", e);
+    }
+    try {
+      Writable keyContainer = keySerDe.getSerializedClass().newInstance();
+      Writable valueContainer = valueSerDe.getSerializedClass().newInstance();    
+      int numKeys = in.readInt();
+      for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+        MapJoinKey key = new MapJoinKey();
+        key.read(keyContext, in, keyContainer);
+        MapJoinRowContainer values = new MapJoinRowContainer();
+        values.read(valueContext, in, valueContainer);
+        tableContainer.put(key, values);
+      }
+      return tableContainer;
+    } catch (IOException e) {
+      throw new HiveException("IO error while trying to create table container", e);
+    } catch(Exception e) {
+      throw new HiveException("Error while trying to create table container", e);
+    }
+  }
+  public void persist(ObjectOutputStream out, MapJoinTableContainer tableContainer)
+      throws HiveException {
+    int numKeys = tableContainer.size();
+    try { 
+      out.writeUTF(tableContainer.getClass().getName());
+      out.writeObject(tableContainer.getMetaData());
+      out.writeInt(numKeys);
+      for(Map.Entry<MapJoinKey, MapJoinRowContainer> entry : tableContainer.entrySet()) {
+        entry.getKey().write(keyContext, out);
+        entry.getValue().write(valueContext, out);
+      }
+    } catch (SerDeException e) {
+      String msg = "SerDe error while attempting to persist table container";
+      throw new HiveException(msg, e);
+    } catch(IOException e) {
+      String msg = "IO error while attempting to persist table container";
+      throw new HiveException(msg, e);
+    }
+    if(numKeys != tableContainer.size()) {
+      throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer);
+    }
+  }
+  
+  public static void persistDummyTable(ObjectOutputStream out) throws IOException {
+    MapJoinTableContainer tableContainer = new HashMapWrapper();
+    out.writeUTF(tableContainer.getClass().getName());
+    out.writeObject(tableContainer.getMetaData());
+    out.writeInt(tableContainer.size());
+  }
+  
+  private MapJoinTableContainer create(String name, Map<String, String> metaData) throws HiveException {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends MapJoinTableContainer> clazz = (Class<? extends MapJoinTableContainer>) Class.forName(name);
+      Constructor<? extends MapJoinTableContainer> constructor = clazz.getDeclaredConstructor(Map.class);
+      return constructor.newInstance(metaData);
+    } catch (Exception e) {
+      String msg = "Error while attemping to create table container" +
+      		" of type: " + name + ", with metaData: " + metaData;
+      throw new HiveException(msg, e);
+    }
+  }
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Fri Aug 16 15:52:42 2013
@@ -71,18 +71,18 @@ import org.apache.hadoop.util.Reflection
  * reading.
  *
  */
-public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row> {
+public class RowContainer<ROW extends List<Object>> extends AbstractRowContainer<ROW> {
 
   protected static Log LOG = LogFactory.getLog(RowContainer.class);
 
   // max # of rows can be put into one block
   private static final int BLOCKSIZE = 25000;
 
-  private Row[] currentWriteBlock; // the last block that add() should append to
-  private Row[] currentReadBlock; // the current block where the cursor is in
+  private ROW[] currentWriteBlock; // the last block that add() should append to
+  private ROW[] currentReadBlock; // the current block where the cursor is in
   // since currentReadBlock may assigned to currentWriteBlock, we need to store
   // original read block
-  private Row[] firstReadBlockPointer;
+  private ROW[] firstReadBlockPointer;
   private int blockSize; // number of objects in the block before it is spilled
   // to disk
   private int numFlushedBlocks; // total # of blocks
@@ -108,7 +108,7 @@ public class RowContainer<Row extends Li
   RecordWriter rw = null;
   InputFormat<WritableComparable, Writable> inputFormat = null;
   InputSplit[] inputSplits = null;
-  private Row dummyRow = null;
+  private ROW dummyRow = null;
   private final Reporter reporter;
 
   Writable val = null; // cached to use serialize data
@@ -130,7 +130,7 @@ public class RowContainer<Row extends Li
     this.addCursor = 0;
     this.numFlushedBlocks = 0;
     this.tmpFile = null;
-    this.currentWriteBlock = (Row[]) new ArrayList[blockSize];
+    this.currentWriteBlock = (ROW[]) new ArrayList[blockSize];
     this.currentReadBlock = this.currentWriteBlock;
     this.firstReadBlockPointer = currentReadBlock;
     this.serde = null;
@@ -158,13 +158,13 @@ public class RowContainer<Row extends Li
   }
 
   @Override
-  public void add(Row t) throws HiveException {
+  public void add(ROW t) throws HiveException {
     if (this.tblDesc != null) {
       if (addCursor >= blockSize) { // spill the current block to tmp file
         spillBlock(currentWriteBlock, addCursor);
         addCursor = 0;
         if (numFlushedBlocks == 1) {
-          currentWriteBlock = (Row[]) new ArrayList[blockSize];
+          currentWriteBlock = (ROW[]) new ArrayList[blockSize];
         }
       }
       currentWriteBlock[addCursor++] = t;
@@ -178,7 +178,7 @@ public class RowContainer<Row extends Li
   }
 
   @Override
-  public Row first() throws HiveException {
+  public ROW first() throws HiveException {
     if (size == 0) {
       return null;
     }
@@ -224,7 +224,7 @@ public class RowContainer<Row extends Li
         nextBlock();
       }
       // we are guaranteed that we can get data here (since 'size' is not zero)
-      Row ret = currentReadBlock[itrCursor++];
+      ROW ret = currentReadBlock[itrCursor++];
       removeKeys(ret);
       return ret;
     } catch (Exception e) {
@@ -234,7 +234,7 @@ public class RowContainer<Row extends Li
   }
 
   @Override
-  public Row next() throws HiveException {
+  public ROW next() throws HiveException {
 
     if (!firstCalled) {
       throw new RuntimeException("Call first() then call next().");
@@ -252,7 +252,7 @@ public class RowContainer<Row extends Li
       return null;
     }
 
-    Row ret;
+    ROW ret;
     if (itrCursor < this.readBlockSize) {
       ret = this.currentReadBlock[itrCursor++];
       removeKeys(ret);
@@ -273,7 +273,7 @@ public class RowContainer<Row extends Li
     }
   }
 
-  private void removeKeys(Row ret) {
+  private void removeKeys(ROW ret) {
     if (this.keyObject != null && this.currentReadBlock != this.currentWriteBlock) {
       int len = this.keyObject.size();
       int rowSize = ((ArrayList) ret).size();
@@ -285,7 +285,7 @@ public class RowContainer<Row extends Li
 
   private final ArrayList<Object> row = new ArrayList<Object>(2);
 
-  private void spillBlock(Row[] block, int length) throws HiveException {
+  private void spillBlock(ROW[] block, int length) throws HiveException {
     try {
       if (tmpFile == null) {
 
@@ -329,14 +329,14 @@ public class RowContainer<Row extends Li
       if (this.keyObject != null) {
         row.set(1, this.keyObject);
         for (int i = 0; i < length; ++i) {
-          Row currentValRow = block[i];
+          ROW currentValRow = block[i];
           row.set(0, currentValRow);
           Writable outVal = serde.serialize(row, standardOI);
           rw.write(outVal);
         }
       } else {
         for (int i = 0; i < length; ++i) {
-          Row currentValRow = block[i];
+          ROW currentValRow = block[i];
           Writable outVal = serde.serialize(currentValRow, standardOI);
           rw.write(outVal);
         }
@@ -382,7 +382,7 @@ public class RowContainer<Row extends Li
         Object key = rr.createKey();
         while (i < this.currentReadBlock.length && rr.next(key, val)) {
           nextSplit = false;
-          this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
+          this.currentReadBlock[i++] = (ROW) ObjectInspectorUtils.copyToStandardObject(serde
               .deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
         }
       }

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapJoinMemoryExhaustionHandler {
+  private static final Log LOG = LogFactory.getLog(TestMapJoinMemoryExhaustionHandler.class);
+
+  private LogHelper logHelper;
+  
+  @Before
+  public void setup() {
+    logHelper = new LogHelper(LOG);
+  }
+  @Test(expected=MapJoinMemoryExhaustionException.class)
+  public void testAbort() throws MapJoinMemoryExhaustionException {
+    MapJoinMemoryExhaustionHandler handler = new MapJoinMemoryExhaustionHandler(logHelper, 0.01d);
+    List<byte[]> memoryConsumer = new ArrayList<byte[]>();
+    while(true) {
+      handler.checkMemoryStatus(1, 1);
+      memoryConsumer.add(new byte[5 * 1024 * 1024]);
+    }
+  }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapJoinEqualityTableContainer {
+  
+  private static final MapJoinKey KEY1 = new MapJoinKey(new Object[] {new Text("key1")});
+  private static final MapJoinKey KEY2 = new MapJoinKey(new Object[] {new Text("key2")});
+  private static final MapJoinKey KEY3 = new MapJoinKey(new Object[] {new Text("key3")});
+  private static final MapJoinKey KEY4 = new MapJoinKey(new Object[] {new Text("key4")});
+  private static final Object[] VALUE = new Object[] {new Text("value")};
+  private MapJoinTableContainer container;
+  private MapJoinRowContainer rowContainer;
+  @Before
+  public void setup() throws Exception {
+    rowContainer = new MapJoinRowContainer();
+    rowContainer.add(VALUE);
+    container = new HashMapWrapper();
+  }
+  @Test
+  public void testContainerBasics() throws Exception {
+    container.put(KEY1, rowContainer);
+    container.put(KEY2, rowContainer);
+    container.put(KEY3, rowContainer);
+    container.put(KEY4, rowContainer);
+    Assert.assertEquals(4, container.size());
+    Map<MapJoinKey, MapJoinRowContainer> localContainer = new HashMap<MapJoinKey, MapJoinRowContainer>();
+    for(Entry<MapJoinKey, MapJoinRowContainer> entry : container.entrySet()) {
+      localContainer.put(entry.getKey(), entry.getValue());
+    }
+    Utilities.testEquality(container.get(KEY1), localContainer.get(KEY1));
+    Utilities.testEquality(container.get(KEY2), localContainer.get(KEY2));
+    Utilities.testEquality(container.get(KEY3), localContainer.get(KEY3));
+    Utilities.testEquality(container.get(KEY4), localContainer.get(KEY4));
+    container.clear();
+    Assert.assertEquals(0, container.size());
+    Assert.assertTrue(container.entrySet().isEmpty());
+  }
+}
\ No newline at end of file

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class TestMapJoinKey {
+
+  @Test
+  public void testEqualityHashCode() throws Exception {
+    MapJoinKey key1 = new MapJoinKey(new String[] {"key"});
+    MapJoinKey key2 = new MapJoinKey(new String[] {"key"});
+    Utilities.testEquality(key1, key2);
+    key1 = new MapJoinKey(new Object[] {148, null});
+    key2 = new MapJoinKey(new Object[] {148, null});
+    Utilities.testEquality(key1, key2);
+    key1 = new MapJoinKey(new Object[] {null, "key1"});
+    key2 = new MapJoinKey(new Object[] {null, "key2"});
+    Assert.assertFalse(key1.equals(key2));
+  }
+  @Test
+  public void testHasAnyNulls() throws Exception {
+    MapJoinKey key = new MapJoinKey(new String[] {"key", null});
+    Assert.assertTrue(key.hasAnyNulls(null));
+    // field 1 is not null safe
+    Assert.assertTrue(key.hasAnyNulls(new boolean[] { false, false }));
+    // field 1 is null safe
+    Assert.assertFalse(key.hasAnyNulls(new boolean[] { false, true }));
+    Assert.assertFalse(key.hasAnyNulls(new boolean[] { true, true }));
+  }
+  @Test
+  public void testSerialization() throws Exception {
+    MapJoinKey key1 = new MapJoinKey(new Object[] {new Text("field0"), null, new Text("field2")});
+    MapJoinKey key2 = Utilities.serde(key1, "f0,f1,f2", "string,string,string");
+    Utilities.testEquality(key1, key2);
+  }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.Arrays;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class TestMapJoinRowContainer {
+  
+  @Test
+  public void testSerialization() throws Exception {
+    MapJoinRowContainer container1 = new MapJoinRowContainer();
+    container1.add(new Object[]{ new Text("f0"), null, new ShortWritable((short)0xf)});
+    container1.add(Arrays.asList(new Object[]{ null, new Text("f1"), new ShortWritable((short)0xf)}));
+    container1.add(new Object[]{ null, null, new ShortWritable((short)0xf)});
+    container1.add(Arrays.asList(new Object[]{ new Text("f0"), new Text("f1"), new ShortWritable((short)0x1)}));
+    MapJoinRowContainer container2 = Utilities.serde(container1, "f0,f1,filter", "string,string,smallint");
+    Utilities.testEquality(container1, container2);
+    Assert.assertEquals(4, container1.size());
+    Assert.assertEquals(1, container2.getAliasFilter());
+  }
+
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapJoinTableContainer {
+  
+  private static final Object[] KEY = new Object[] {new Text("key")};
+  private static final Object[] VALUE = new Object[] {new Text("value")};
+  private ByteArrayOutputStream baos;
+  private ObjectOutputStream out;
+  private ObjectInputStream in;
+  private MapJoinTableContainer container;
+  private MapJoinTableContainerSerDe containerSerde;
+  private MapJoinKey key;
+  private MapJoinRowContainer rowContainer;
+  @Before
+  public void setup() throws Exception {
+    key = new MapJoinKey(KEY);
+    rowContainer = new MapJoinRowContainer();
+    rowContainer.add(VALUE);
+    baos = new ByteArrayOutputStream();
+    out = new ObjectOutputStream(baos);
+    
+    LazyBinarySerDe keySerde = new LazyBinarySerDe();
+    Properties keyProps = new Properties();
+    keyProps.put(serdeConstants.LIST_COLUMNS, "v1");
+    keyProps.put(serdeConstants.LIST_COLUMN_TYPES, "string");
+    keySerde.initialize(null, keyProps);
+    LazyBinarySerDe valueSerde = new LazyBinarySerDe();
+    Properties valueProps = new Properties();
+    valueProps.put(serdeConstants.LIST_COLUMNS, "v1");
+    valueProps.put(serdeConstants.LIST_COLUMN_TYPES, "string");
+    valueSerde.initialize(null, keyProps);
+    containerSerde = new MapJoinTableContainerSerDe(
+        new MapJoinObjectSerDeContext(keySerde, false),
+        new MapJoinObjectSerDeContext(valueSerde, false));
+    container = new HashMapWrapper();
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    container.put(key, rowContainer);
+    containerSerde.persist(out, container);
+    out.close();
+    in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    container = containerSerde.load(in);
+    Utilities.testEquality(rowContainer, container.get(key));
+  }
+  @Test
+  public void testDummyContainer() throws Exception {
+    MapJoinTableContainerSerDe.persistDummyTable(out);
+    out.close();
+    in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    container = containerSerde.load(in);
+    Assert.assertEquals(0, container.size());
+    Assert.assertTrue(container.entrySet().isEmpty());
+  }  
+}
\ No newline at end of file

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.io.BytesWritable;
+
+class Utilities {
+
+  static void testEquality(MapJoinKey key1, MapJoinKey key2) {
+    Assert.assertEquals(key1.hashCode(), key2.hashCode());
+    Assert.assertEquals(key1, key2);
+    Assert.assertEquals(key1.getKey().length, key2.getKey().length);
+    int length = key1.getKey().length;
+    for (int i = 0; i <length; i++) {
+      Assert.assertEquals(key1.getKey()[i], key2.getKey()[i]); 
+    }
+  }
+  
+  static MapJoinKey serde(MapJoinKey key, String columns, String types) 
+  throws Exception {
+    MapJoinKey result = new MapJoinKey();
+    ByteArrayInputStream bais;
+    ObjectInputStream in;
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(baos);
+    LazyBinarySerDe serde = new LazyBinarySerDe();
+    Properties props = new Properties();
+    props.put(serdeConstants.LIST_COLUMNS, columns);
+    props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+    serde.initialize(null, props);
+    MapJoinObjectSerDeContext context = new MapJoinObjectSerDeContext(serde, false);    
+    key.write(context, out);
+    out.close();
+    bais = new ByteArrayInputStream(baos.toByteArray());
+    in = new ObjectInputStream(bais);
+    result.read(context, in, new BytesWritable());
+    return result;
+  }
+  
+  
+  static void testEquality(MapJoinRowContainer container1, MapJoinRowContainer container2) {
+    Assert.assertEquals(container1.size(), container2.size());
+    List<Object> row1 = container1.first();
+    List<Object> row2 = container2.first();
+    for (; row1 != null && row2 != null; row1 = container1.next(), row2 = container2.next()) {
+      Assert.assertEquals(row1, row2);
+    }
+  }
+  
+  static MapJoinRowContainer serde(MapJoinRowContainer container, String columns, String types) 
+  throws Exception {
+    MapJoinRowContainer result = new MapJoinRowContainer();
+    ByteArrayInputStream bais;
+    ObjectInputStream in;
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(baos);
+    LazyBinarySerDe serde = new LazyBinarySerDe();
+    Properties props = new Properties();
+    props.put(serdeConstants.LIST_COLUMNS, columns);
+    props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+    serde.initialize(null, props);
+    MapJoinObjectSerDeContext context = new MapJoinObjectSerDeContext(serde, true);    
+    container.write(context, out);
+    out.close();
+    bais = new ByteArrayInputStream(baos.toByteArray());
+    in = new ObjectInputStream(bais);
+    result.read(context, in, new BytesWritable());
+    return result;
+  }
+}