You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 17:52:43 UTC
svn commit: r1514760 [2/2] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/mapjoin/
java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/exec/persistence/
test/org/apache/hadoop/hive/q...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Constructor;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinTableContainerSerDe {
+
+ private final MapJoinObjectSerDeContext keyContext;
+ private final MapJoinObjectSerDeContext valueContext;
+ public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext,
+ MapJoinObjectSerDeContext valueContext) {
+ this.keyContext = keyContext;
+ this.valueContext = valueContext;
+ }
+ @SuppressWarnings({"unchecked"})
+ public MapJoinTableContainer load(ObjectInputStream in)
+ throws HiveException {
+ SerDe keySerDe = keyContext.getSerDe();
+ SerDe valueSerDe = valueContext.getSerDe();
+ MapJoinTableContainer tableContainer;
+ try {
+ String name = in.readUTF();
+ Map<String, String> metaData = (Map<String, String>) in.readObject();
+ tableContainer = create(name, metaData);
+ } catch (IOException e) {
+ throw new HiveException("IO error while trying to create table container", e);
+ } catch (ClassNotFoundException e) {
+ throw new HiveException("Class Initialization error while trying to create table container", e);
+ }
+ try {
+ Writable keyContainer = keySerDe.getSerializedClass().newInstance();
+ Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ MapJoinKey key = new MapJoinKey();
+ key.read(keyContext, in, keyContainer);
+ MapJoinRowContainer values = new MapJoinRowContainer();
+ values.read(valueContext, in, valueContainer);
+ tableContainer.put(key, values);
+ }
+ return tableContainer;
+ } catch (IOException e) {
+ throw new HiveException("IO error while trying to create table container", e);
+ } catch(Exception e) {
+ throw new HiveException("Error while trying to create table container", e);
+ }
+ }
+ public void persist(ObjectOutputStream out, MapJoinTableContainer tableContainer)
+ throws HiveException {
+ int numKeys = tableContainer.size();
+ try {
+ out.writeUTF(tableContainer.getClass().getName());
+ out.writeObject(tableContainer.getMetaData());
+ out.writeInt(numKeys);
+ for(Map.Entry<MapJoinKey, MapJoinRowContainer> entry : tableContainer.entrySet()) {
+ entry.getKey().write(keyContext, out);
+ entry.getValue().write(valueContext, out);
+ }
+ } catch (SerDeException e) {
+ String msg = "SerDe error while attempting to persist table container";
+ throw new HiveException(msg, e);
+ } catch(IOException e) {
+ String msg = "IO error while attempting to persist table container";
+ throw new HiveException(msg, e);
+ }
+ if(numKeys != tableContainer.size()) {
+ throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer);
+ }
+ }
+
+ public static void persistDummyTable(ObjectOutputStream out) throws IOException {
+ MapJoinTableContainer tableContainer = new HashMapWrapper();
+ out.writeUTF(tableContainer.getClass().getName());
+ out.writeObject(tableContainer.getMetaData());
+ out.writeInt(tableContainer.size());
+ }
+
+ private MapJoinTableContainer create(String name, Map<String, String> metaData) throws HiveException {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends MapJoinTableContainer> clazz = (Class<? extends MapJoinTableContainer>) Class.forName(name);
+ Constructor<? extends MapJoinTableContainer> constructor = clazz.getDeclaredConstructor(Map.class);
+ return constructor.newInstance(metaData);
+ } catch (Exception e) {
+ String msg = "Error while attemping to create table container" +
+ " of type: " + name + ", with metaData: " + metaData;
+ throw new HiveException(msg, e);
+ }
+ }
+}
\ No newline at end of file
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1514760&r1=1514759&r2=1514760&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Fri Aug 16 15:52:42 2013
@@ -71,18 +71,18 @@ import org.apache.hadoop.util.Reflection
* reading.
*
*/
-public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row> {
+public class RowContainer<ROW extends List<Object>> extends AbstractRowContainer<ROW> {
protected static Log LOG = LogFactory.getLog(RowContainer.class);
// max # of rows can be put into one block
private static final int BLOCKSIZE = 25000;
- private Row[] currentWriteBlock; // the last block that add() should append to
- private Row[] currentReadBlock; // the current block where the cursor is in
+ private ROW[] currentWriteBlock; // the last block that add() should append to
+ private ROW[] currentReadBlock; // the current block where the cursor is in
// since currentReadBlock may assigned to currentWriteBlock, we need to store
// original read block
- private Row[] firstReadBlockPointer;
+ private ROW[] firstReadBlockPointer;
private int blockSize; // number of objects in the block before it is spilled
// to disk
private int numFlushedBlocks; // total # of blocks
@@ -108,7 +108,7 @@ public class RowContainer<Row extends Li
RecordWriter rw = null;
InputFormat<WritableComparable, Writable> inputFormat = null;
InputSplit[] inputSplits = null;
- private Row dummyRow = null;
+ private ROW dummyRow = null;
private final Reporter reporter;
Writable val = null; // cached to use serialize data
@@ -130,7 +130,7 @@ public class RowContainer<Row extends Li
this.addCursor = 0;
this.numFlushedBlocks = 0;
this.tmpFile = null;
- this.currentWriteBlock = (Row[]) new ArrayList[blockSize];
+ this.currentWriteBlock = (ROW[]) new ArrayList[blockSize];
this.currentReadBlock = this.currentWriteBlock;
this.firstReadBlockPointer = currentReadBlock;
this.serde = null;
@@ -158,13 +158,13 @@ public class RowContainer<Row extends Li
}
@Override
- public void add(Row t) throws HiveException {
+ public void add(ROW t) throws HiveException {
if (this.tblDesc != null) {
if (addCursor >= blockSize) { // spill the current block to tmp file
spillBlock(currentWriteBlock, addCursor);
addCursor = 0;
if (numFlushedBlocks == 1) {
- currentWriteBlock = (Row[]) new ArrayList[blockSize];
+ currentWriteBlock = (ROW[]) new ArrayList[blockSize];
}
}
currentWriteBlock[addCursor++] = t;
@@ -178,7 +178,7 @@ public class RowContainer<Row extends Li
}
@Override
- public Row first() throws HiveException {
+ public ROW first() throws HiveException {
if (size == 0) {
return null;
}
@@ -224,7 +224,7 @@ public class RowContainer<Row extends Li
nextBlock();
}
// we are guaranteed that we can get data here (since 'size' is not zero)
- Row ret = currentReadBlock[itrCursor++];
+ ROW ret = currentReadBlock[itrCursor++];
removeKeys(ret);
return ret;
} catch (Exception e) {
@@ -234,7 +234,7 @@ public class RowContainer<Row extends Li
}
@Override
- public Row next() throws HiveException {
+ public ROW next() throws HiveException {
if (!firstCalled) {
throw new RuntimeException("Call first() then call next().");
@@ -252,7 +252,7 @@ public class RowContainer<Row extends Li
return null;
}
- Row ret;
+ ROW ret;
if (itrCursor < this.readBlockSize) {
ret = this.currentReadBlock[itrCursor++];
removeKeys(ret);
@@ -273,7 +273,7 @@ public class RowContainer<Row extends Li
}
}
- private void removeKeys(Row ret) {
+ private void removeKeys(ROW ret) {
if (this.keyObject != null && this.currentReadBlock != this.currentWriteBlock) {
int len = this.keyObject.size();
int rowSize = ((ArrayList) ret).size();
@@ -285,7 +285,7 @@ public class RowContainer<Row extends Li
private final ArrayList<Object> row = new ArrayList<Object>(2);
- private void spillBlock(Row[] block, int length) throws HiveException {
+ private void spillBlock(ROW[] block, int length) throws HiveException {
try {
if (tmpFile == null) {
@@ -329,14 +329,14 @@ public class RowContainer<Row extends Li
if (this.keyObject != null) {
row.set(1, this.keyObject);
for (int i = 0; i < length; ++i) {
- Row currentValRow = block[i];
+ ROW currentValRow = block[i];
row.set(0, currentValRow);
Writable outVal = serde.serialize(row, standardOI);
rw.write(outVal);
}
} else {
for (int i = 0; i < length; ++i) {
- Row currentValRow = block[i];
+ ROW currentValRow = block[i];
Writable outVal = serde.serialize(currentValRow, standardOI);
rw.write(outVal);
}
@@ -382,7 +382,7 @@ public class RowContainer<Row extends Li
Object key = rr.createKey();
while (i < this.currentReadBlock.length && rr.next(key, val)) {
nextSplit = false;
- this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
+ this.currentReadBlock[i++] = (ROW) ObjectInspectorUtils.copyToStandardObject(serde
.deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
}
}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapJoinMemoryExhaustionHandler {
+ private static final Log LOG = LogFactory.getLog(TestMapJoinMemoryExhaustionHandler.class);
+
+ private LogHelper logHelper;
+
+ @Before
+ public void setup() {
+ logHelper = new LogHelper(LOG);
+ }
+ @Test(expected=MapJoinMemoryExhaustionException.class)
+ public void testAbort() throws MapJoinMemoryExhaustionException {
+ MapJoinMemoryExhaustionHandler handler = new MapJoinMemoryExhaustionHandler(logHelper, 0.01d);
+ List<byte[]> memoryConsumer = new ArrayList<byte[]>();
+ while(true) {
+ handler.checkMemoryStatus(1, 1);
+ memoryConsumer.add(new byte[5 * 1024 * 1024]);
+ }
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapJoinEqualityTableContainer {
+
+ private static final MapJoinKey KEY1 = new MapJoinKey(new Object[] {new Text("key1")});
+ private static final MapJoinKey KEY2 = new MapJoinKey(new Object[] {new Text("key2")});
+ private static final MapJoinKey KEY3 = new MapJoinKey(new Object[] {new Text("key3")});
+ private static final MapJoinKey KEY4 = new MapJoinKey(new Object[] {new Text("key4")});
+ private static final Object[] VALUE = new Object[] {new Text("value")};
+ private MapJoinTableContainer container;
+ private MapJoinRowContainer rowContainer;
+ @Before
+ public void setup() throws Exception {
+ rowContainer = new MapJoinRowContainer();
+ rowContainer.add(VALUE);
+ container = new HashMapWrapper();
+ }
+ @Test
+ public void testContainerBasics() throws Exception {
+ container.put(KEY1, rowContainer);
+ container.put(KEY2, rowContainer);
+ container.put(KEY3, rowContainer);
+ container.put(KEY4, rowContainer);
+ Assert.assertEquals(4, container.size());
+ Map<MapJoinKey, MapJoinRowContainer> localContainer = new HashMap<MapJoinKey, MapJoinRowContainer>();
+ for(Entry<MapJoinKey, MapJoinRowContainer> entry : container.entrySet()) {
+ localContainer.put(entry.getKey(), entry.getValue());
+ }
+ Utilities.testEquality(container.get(KEY1), localContainer.get(KEY1));
+ Utilities.testEquality(container.get(KEY2), localContainer.get(KEY2));
+ Utilities.testEquality(container.get(KEY3), localContainer.get(KEY3));
+ Utilities.testEquality(container.get(KEY4), localContainer.get(KEY4));
+ container.clear();
+ Assert.assertEquals(0, container.size());
+ Assert.assertTrue(container.entrySet().isEmpty());
+ }
+}
\ No newline at end of file
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class TestMapJoinKey {
+
+ @Test
+ public void testEqualityHashCode() throws Exception {
+ MapJoinKey key1 = new MapJoinKey(new String[] {"key"});
+ MapJoinKey key2 = new MapJoinKey(new String[] {"key"});
+ Utilities.testEquality(key1, key2);
+ key1 = new MapJoinKey(new Object[] {148, null});
+ key2 = new MapJoinKey(new Object[] {148, null});
+ Utilities.testEquality(key1, key2);
+ key1 = new MapJoinKey(new Object[] {null, "key1"});
+ key2 = new MapJoinKey(new Object[] {null, "key2"});
+ Assert.assertFalse(key1.equals(key2));
+ }
+ @Test
+ public void testHasAnyNulls() throws Exception {
+ MapJoinKey key = new MapJoinKey(new String[] {"key", null});
+ Assert.assertTrue(key.hasAnyNulls(null));
+ // field 1 is not null safe
+ Assert.assertTrue(key.hasAnyNulls(new boolean[] { false, false }));
+ // field 1 is null safe
+ Assert.assertFalse(key.hasAnyNulls(new boolean[] { false, true }));
+ Assert.assertFalse(key.hasAnyNulls(new boolean[] { true, true }));
+ }
+ @Test
+ public void testSerialization() throws Exception {
+ MapJoinKey key1 = new MapJoinKey(new Object[] {new Text("field0"), null, new Text("field2")});
+ MapJoinKey key2 = Utilities.serde(key1, "f0,f1,f2", "string,string,string");
+ Utilities.testEquality(key1, key2);
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.Arrays;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class TestMapJoinRowContainer {
+
+ @Test
+ public void testSerialization() throws Exception {
+ MapJoinRowContainer container1 = new MapJoinRowContainer();
+ container1.add(new Object[]{ new Text("f0"), null, new ShortWritable((short)0xf)});
+ container1.add(Arrays.asList(new Object[]{ null, new Text("f1"), new ShortWritable((short)0xf)}));
+ container1.add(new Object[]{ null, null, new ShortWritable((short)0xf)});
+ container1.add(Arrays.asList(new Object[]{ new Text("f0"), new Text("f1"), new ShortWritable((short)0x1)}));
+ MapJoinRowContainer container2 = Utilities.serde(container1, "f0,f1,filter", "string,string,smallint");
+ Utilities.testEquality(container1, container2);
+ Assert.assertEquals(4, container1.size());
+ Assert.assertEquals(1, container2.getAliasFilter());
+ }
+
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMapJoinTableContainer {
+
+ private static final Object[] KEY = new Object[] {new Text("key")};
+ private static final Object[] VALUE = new Object[] {new Text("value")};
+ private ByteArrayOutputStream baos;
+ private ObjectOutputStream out;
+ private ObjectInputStream in;
+ private MapJoinTableContainer container;
+ private MapJoinTableContainerSerDe containerSerde;
+ private MapJoinKey key;
+ private MapJoinRowContainer rowContainer;
+ @Before
+ public void setup() throws Exception {
+ key = new MapJoinKey(KEY);
+ rowContainer = new MapJoinRowContainer();
+ rowContainer.add(VALUE);
+ baos = new ByteArrayOutputStream();
+ out = new ObjectOutputStream(baos);
+
+ LazyBinarySerDe keySerde = new LazyBinarySerDe();
+ Properties keyProps = new Properties();
+ keyProps.put(serdeConstants.LIST_COLUMNS, "v1");
+ keyProps.put(serdeConstants.LIST_COLUMN_TYPES, "string");
+ keySerde.initialize(null, keyProps);
+ LazyBinarySerDe valueSerde = new LazyBinarySerDe();
+ Properties valueProps = new Properties();
+ valueProps.put(serdeConstants.LIST_COLUMNS, "v1");
+ valueProps.put(serdeConstants.LIST_COLUMN_TYPES, "string");
+ valueSerde.initialize(null, keyProps);
+ containerSerde = new MapJoinTableContainerSerDe(
+ new MapJoinObjectSerDeContext(keySerde, false),
+ new MapJoinObjectSerDeContext(valueSerde, false));
+ container = new HashMapWrapper();
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ container.put(key, rowContainer);
+ containerSerde.persist(out, container);
+ out.close();
+ in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ container = containerSerde.load(in);
+ Utilities.testEquality(rowContainer, container.get(key));
+ }
+ @Test
+ public void testDummyContainer() throws Exception {
+ MapJoinTableContainerSerDe.persistDummyTable(out);
+ out.close();
+ in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ container = containerSerde.load(in);
+ Assert.assertEquals(0, container.size());
+ Assert.assertTrue(container.entrySet().isEmpty());
+ }
+}
\ No newline at end of file
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java?rev=1514760&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java Fri Aug 16 15:52:42 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.io.BytesWritable;
+
+class Utilities {
+
+ static void testEquality(MapJoinKey key1, MapJoinKey key2) {
+ Assert.assertEquals(key1.hashCode(), key2.hashCode());
+ Assert.assertEquals(key1, key2);
+ Assert.assertEquals(key1.getKey().length, key2.getKey().length);
+ int length = key1.getKey().length;
+ for (int i = 0; i <length; i++) {
+ Assert.assertEquals(key1.getKey()[i], key2.getKey()[i]);
+ }
+ }
+
+ static MapJoinKey serde(MapJoinKey key, String columns, String types)
+ throws Exception {
+ MapJoinKey result = new MapJoinKey();
+ ByteArrayInputStream bais;
+ ObjectInputStream in;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ LazyBinarySerDe serde = new LazyBinarySerDe();
+ Properties props = new Properties();
+ props.put(serdeConstants.LIST_COLUMNS, columns);
+ props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+ serde.initialize(null, props);
+ MapJoinObjectSerDeContext context = new MapJoinObjectSerDeContext(serde, false);
+ key.write(context, out);
+ out.close();
+ bais = new ByteArrayInputStream(baos.toByteArray());
+ in = new ObjectInputStream(bais);
+ result.read(context, in, new BytesWritable());
+ return result;
+ }
+
+
+ static void testEquality(MapJoinRowContainer container1, MapJoinRowContainer container2) {
+ Assert.assertEquals(container1.size(), container2.size());
+ List<Object> row1 = container1.first();
+ List<Object> row2 = container2.first();
+ for (; row1 != null && row2 != null; row1 = container1.next(), row2 = container2.next()) {
+ Assert.assertEquals(row1, row2);
+ }
+ }
+
+ static MapJoinRowContainer serde(MapJoinRowContainer container, String columns, String types)
+ throws Exception {
+ MapJoinRowContainer result = new MapJoinRowContainer();
+ ByteArrayInputStream bais;
+ ObjectInputStream in;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ LazyBinarySerDe serde = new LazyBinarySerDe();
+ Properties props = new Properties();
+ props.put(serdeConstants.LIST_COLUMNS, columns);
+ props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+ serde.initialize(null, props);
+ MapJoinObjectSerDeContext context = new MapJoinObjectSerDeContext(serde, true);
+ container.write(context, out);
+ out.close();
+ bais = new ByteArrayInputStream(baos.toByteArray());
+ in = new ObjectInputStream(bais);
+ result.read(context, in, new BytesWritable());
+ return result;
+ }
+}