You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC
svn commit: r1463556 [4/15] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/
ql/src/gen/thrift/gen-cpp/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,1124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.ref.SoftReference;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * contains all the classes to support persisting a PTF partition,
+ */
+public class PTFPersistence {
+
+ @SuppressWarnings("unchecked")
+ public static ByteBasedList createList(String clsName, int capacity) throws HiveException
+ {
+ try
+ {
+ Class<? extends ByteBasedList> cls = (Class<? extends ByteBasedList>) Class.forName(clsName);
+ Constructor<? extends ByteBasedList> cons = cls.getConstructor(Integer.TYPE);
+ return cons.newInstance(capacity);
+ }
+ catch(Exception e)
+ {
+ throw new HiveException(e);
+ }
+ }
+
+ public static class ByteBasedList
+ {
+ int startOffset;
+
+ /*
+ * (offset,size) of Writables.
+ * entry i at position i << 1
+ * this array is resizable.
+ */
+ int[] offsetsArray;
+
+ /*
+ * contains actual bytes of Writables.
+ * not resizable
+ */
+ byte[] bytes;
+ int bytesUsed;
+
+ int currentSize;
+ ReentrantReadWriteLock lock;
+ volatile long lastModified;
+
+
+ public ByteBasedList(int startOffset, int capacity)
+ {
+ this.startOffset = startOffset;
+ bytes = new byte[capacity];
+ offsetsArray = new int[INCREMENT_SIZE];
+ bytesUsed = 0;
+ currentSize = 0;
+ lock = new ReentrantReadWriteLock();
+ lastModified = System.nanoTime();
+ }
+
+ public ByteBasedList()
+ {
+ this(0, MEDIUM_SIZE);
+ }
+
+ protected void reset(int startOffset) throws HiveException {
+ PTFPersistence.lock(lock.writeLock());
+ try
+ {
+ this.startOffset = startOffset;
+ bytesUsed = 0;
+ currentSize = 0;
+ Arrays.fill(offsetsArray, 0);
+ lastModified = System.nanoTime();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public ByteBasedList(int capacity)
+ {
+ this(0, capacity);
+ }
+
+ /*
+ * internal api; used by {@link PersistentByteBasedList} to setup BBList from a file.
+ */
+ protected ByteBasedList(File file)
+ {
+ lock = new ReentrantReadWriteLock();
+ }
+
+ private void ensureCapacity(int wlen) throws ListFullException
+ {
+ if ( bytesUsed + wlen > bytes.length)
+ {
+ throw new ListFullException();
+ }
+
+ if ( (2 * currentSize + 1) > offsetsArray.length )
+ {
+ int[] na = new int[offsetsArray.length + INCREMENT_SIZE];
+ System.arraycopy(offsetsArray, 0, na, 0, offsetsArray.length);
+ offsetsArray = na;
+ }
+ }
+
+ private int index(int i) throws HiveException
+ {
+ int j = i - startOffset;
+ j = j << 1;
+ if ( j > 2 * currentSize )
+ {
+ throw new HiveException(String.format("index invalid %d", i));
+ }
+ return j;
+ }
+
+ private void write(Writable w) throws HiveException, IOException
+ {
+ DataOStream dos = PTFPersistence.dos.get();
+ ByteArrayOS bos = dos.getUnderlyingStream();
+ bos.reset();
+ w.write(dos);
+ ensureCapacity(bos.len());
+ int i = currentSize * 2;
+ System.arraycopy(bos.bytearray(), 0, bytes, bytesUsed, bos.len());
+ offsetsArray[i] = bytesUsed;
+ offsetsArray[i+1] = bos.len();
+ currentSize += 1;
+ bytesUsed += bos.len();
+ lastModified = System.nanoTime();
+ }
+
+
+ public int size() throws HiveException
+ {
+ PTFPersistence.lock(lock.readLock());
+ try
+ {
+ return currentSize;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void get(int i, Writable wObj) throws HiveException
+ {
+ PTFPersistence.lock(lock.readLock());
+ try
+ {
+ i = index(i);
+ DataIStream dis = PTFPersistence.dis.get();
+ ByteArrayIS bis = dis.getUnderlyingStream();
+ bis.setBuffer(bytes, offsetsArray[i], offsetsArray[i+1]);
+ wObj.readFields(dis);
+ }
+ catch(IOException ie)
+ {
+ throw new HiveException(ie);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void append(Writable obj) throws HiveException
+ {
+ PTFPersistence.lock(lock.writeLock());
+ try
+ {
+ write(obj);
+ }
+ catch(IOException ie)
+ {
+ throw new HiveException(ie);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+
+ }
+
+ public Object get(int i, Deserializer deserializer, Writable wObj) throws HiveException
+ {
+ try
+ {
+ get(i, wObj);
+ return deserializer.deserialize(wObj);
+ }
+ catch(SerDeException ie)
+ {
+ throw new HiveException(ie);
+ }
+ }
+
+ public void append(Object obj, ObjectInspector OI, Serializer serializer) throws HiveException
+ {
+ try
+ {
+ append(serializer.serialize(obj, OI));
+ }
+ catch(SerDeException ie)
+ {
+ throw new HiveException(ie);
+ }
+ }
+
+ public Iterator<Writable> iterator(Writable wObj) throws HiveException
+ {
+ return new WIterator(wObj, startOffset);
+ }
+
+ public Iterator<Object> iterator(Deserializer deserializer, Writable wObj) throws HiveException
+ {
+ return new OIterator(deserializer, wObj);
+ }
+
+ public void dump(StringBuilder bldr, Writable wObj) throws IOException, HiveException
+ {
+ bldr.append("[");
+ Iterator<Writable> wi = iterator(wObj);
+ while(wi.hasNext())
+ {
+ wObj = wi.next();
+ bldr.append(wObj).append(", ");
+ }
+ bldr.append("]\n");
+ }
+
+ public void dump(StringBuilder bldr, Deserializer deserializer, Writable wObj) throws IOException, HiveException
+ {
+ bldr.append("[");
+ Iterator<Object> oi = iterator(deserializer, wObj);
+ while(oi.hasNext())
+ {
+ bldr.append(oi.next()).append(", ");
+ }
+ bldr.append("]\n");
+ }
+
+ class WIterator implements Iterator<Writable>
+ {
+ Writable wObj;
+ long checkTime;
+ int i;
+
+ WIterator(Writable wObj, int offset)
+ {
+ this.wObj = wObj;
+ checkTime = lastModified;
+ i = offset;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return i < currentSize;
+ }
+
+ @Override
+ public Writable next()
+ {
+ if (checkTime != lastModified) {
+ throw new ConcurrentModificationException();
+ }
+ try
+ {
+ get(i++, wObj);
+ return wObj;
+ }
+ catch(HiveException be)
+ {
+ throw new RuntimeException(be);
+ }
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ class OIterator implements Iterator<Object>
+ {
+ Deserializer deserializer;
+ Iterator<Writable> wi;
+
+ OIterator(Deserializer deserializer, Writable wObj) throws HiveException
+ {
+ wi = iterator(wObj);
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return wi.hasNext();
+ }
+
+ @Override
+ public Object next()
+ {
+ Writable wObj = wi.next();
+ try
+ {
+ return deserializer.deserialize(wObj);
+ }catch(SerDeException se)
+ {
+ throw new RuntimeException(se);
+ }
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class ListFullException extends HiveException
+ {
+ private static final long serialVersionUID = 4745303310812778989L;
+
+ public ListFullException()
+ {
+ super();
+ }
+
+ public ListFullException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public ListFullException(String message)
+ {
+ super(message);
+ }
+
+ public ListFullException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ }
+
+ private static final int INCREMENT_SIZE = (int) Math.pow(2, 16);
+
+ public static final int SMALL_SIZE = (int) Math.pow(2, 6 +10); // 64KB
+ public static final int MEDIUM_SIZE = (int) Math.pow(2, (10 + 10 + 3)); // 8 MB
+ public static final int LARGE_SIZE = (int) Math.pow(2, (6 + 10 + 10)); // 64 MB
+
+ }
+
+ public static class PartitionedByteBasedList extends ByteBasedList
+ {
+ ArrayList<ByteBasedList> partitions;
+ ArrayList<Integer> partitionOffsets;
+ ArrayList<File> reusableFiles;
+ File dir;
+ int batchSize;
+
+ public PartitionedByteBasedList(int batchSize) throws HiveException
+ {
+ this.batchSize = batchSize;
+ currentSize = 0;
+ dir = PartitionedByteBasedList.createTempDir();
+ Runtime.getRuntime().addShutdownHook(new ShutdownHook(dir));
+
+ partitions = new ArrayList<ByteBasedList>();
+ partitionOffsets = new ArrayList<Integer>();
+ reusableFiles = new ArrayList<File>();
+ addPartition();
+ }
+
+ public PartitionedByteBasedList() throws HiveException
+ {
+ this(ByteBasedList.LARGE_SIZE);
+ }
+
+ @Override
+ protected void reset(int startOffset) throws HiveException {
+ PTFPersistence.lock(lock.writeLock());
+ try {
+ currentSize = 0;
+ for(int i=0; i < partitions.size() - 1; i++) {
+ PersistentByteBasedList p = (PersistentByteBasedList)
+ partitions.remove(0);
+ reusableFiles.add(p.getFile());
+ partitionOffsets.remove(0);
+ }
+ partitions.get(0).reset(0);
+ partitionOffsets.set(0, currentSize);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void addPartition() throws HiveException
+ {
+ try
+ {
+ if ( partitions.size() > 0 )
+ {
+ int idx = partitions.size() - 1;
+ ByteBasedList bl = partitions.get(idx);
+ File f;
+ if ( reusableFiles.size() > 0 ) {
+ f = reusableFiles.remove(0);
+ }
+ else {
+ f = File.createTempFile("wdw", null, dir);
+ }
+ PersistentByteBasedList.store(bl, f);
+ partitions.set(idx, new PersistentByteBasedList(f, bl));
+
+ }
+ ByteBasedList bl = new ByteBasedList(currentSize, batchSize);
+ partitions.add(bl);
+ partitionOffsets.add(currentSize);
+ }
+ catch(IOException ie)
+ {
+ throw new HiveException(ie);
+ }
+ }
+
+ private ByteBasedList getPartition(int i) throws HiveException
+ {
+ PTFPersistence.lock(lock.readLock());
+ try
+ {
+ int numSplits = partitions.size();
+ if ( numSplits == 0) {
+ return partitions.get(0);
+ }
+ int start = 0;
+ int end = numSplits - 1;
+
+ while(start < end)
+ {
+ int mid = (start + end + 1) >>> 1;
+ int val = partitionOffsets.get(mid);
+ if ( val == i )
+ {
+ return partitions.get(mid);
+ }
+ else if ( val < i )
+ {
+ if ( end == mid)
+ {
+ return partitions.get(end);
+ }
+ start = mid;
+ }
+ else
+ {
+ end = mid - 1;
+ }
+ }
+ return partitions.get(start);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void get(int i, Writable wObj) throws HiveException
+ {
+ ByteBasedList bl = getPartition(i);
+ bl.get(i, wObj);
+ }
+
+ @Override
+ public void append(Writable obj) throws HiveException
+ {
+ PTFPersistence.lock(lock.writeLock());
+ try
+ {
+ partitions.get(partitions.size() -1).append(obj);
+ currentSize += 1;
+ lastModified = System.nanoTime();
+ }
+ catch(ListFullException le)
+ {
+ addPartition();
+ append(obj);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+
+ }
+
+ @Override
+ public Object get(int i, Deserializer deserializer, Writable wObj) throws HiveException
+ {
+ ByteBasedList bl = getPartition(i);
+ return bl.get(i, deserializer, wObj);
+ }
+
+ @Override
+ public void append(Object obj, ObjectInspector OI, Serializer serializer) throws HiveException
+ {
+ PTFPersistence.lock(lock.writeLock());
+ try
+ {
+ partitions.get(partitions.size() -1).append(obj, OI, serializer);
+ currentSize += 1;
+ lastModified = System.nanoTime();
+ }
+ catch(ListFullException le)
+ {
+ addPartition();
+ append(obj, OI, serializer);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public Iterator<Writable> iterator(Writable wObj) throws HiveException
+ {
+ return new WIterator(wObj);
+ }
+
+ class WIterator implements Iterator<Writable>
+ {
+ Writable wObj;
+ long checkTime;
+ int i;
+ Iterator<Writable> pIter;
+
+ WIterator(Writable wObj) throws HiveException
+ {
+ this.wObj = wObj;
+ checkTime = lastModified;
+ i = 0;
+ pIter = partitions.get(i).iterator(wObj);
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if ( pIter.hasNext() ) {
+ return true;
+ }
+ if (checkTime != lastModified) {
+ throw new ConcurrentModificationException();
+ }
+ try
+ {
+ if ( i < partitions.size() )
+ {
+ pIter = partitions.get(i++).iterator(wObj);
+ return hasNext();
+ }
+ return false;
+ }
+ catch(HiveException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Writable next()
+ {
+ if (checkTime != lastModified) {
+ throw new ConcurrentModificationException();
+ }
+ return pIter.next();
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ static class ShutdownHook extends Thread
+ {
+ File dir;
+
+ public ShutdownHook(File dir)
+ {
+ this.dir = dir;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ PartitionedByteBasedList.deleteRecursively(dir);
+ }
+ catch(IOException ie)
+ {
+ }
+ }
+
+ }
+
+ // copied completely from guavar09 source
+ /**
+ * Deletes a file or directory and all contents recursively.
+ *
+ * <p>
+ * If the file argument is a symbolic link the link will be deleted but not
+ * the target of the link. If the argument is a directory, symbolic links
+ * within the directory will not be followed.
+ *
+ * @param file
+ * the file to delete
+ * @throws IOException
+ * if an I/O error occurs
+ * @see #deleteDirectoryContents
+ */
+ public static void deleteRecursively(File file) throws IOException
+ {
+ if (file.isDirectory())
+ {
+ deleteDirectoryContents(file);
+ }
+ if (!file.delete())
+ {
+ throw new IOException("Failed to delete " + file);
+ }
+ }
+
+ // copied completely from guavar09 source
+ /**
+ * Deletes all the files within a directory. Does not delete the directory
+ * itself.
+ *
+ * <p>
+ * If the file argument is a symbolic link or there is a symbolic link in
+ * the path leading to the directory, this method will do nothing. Symbolic
+ * links within the directory are not followed.
+ *
+ * @param directory
+ * the directory to delete the contents of
+ * @throws IllegalArgumentException
+ * if the argument is not a directory
+ * @throws IOException
+ * if an I/O error occurs
+ * @see #deleteRecursively
+ */
+ public static void deleteDirectoryContents(File directory)
+ throws IOException
+ {
+ /*Preconditions.checkArgument(directory.isDirectory(),
+ "Not a directory: %s", directory);
+ */
+ if ( !directory.isDirectory())
+ {
+ throw new IOException(String.format("Not a directory: %s", directory));
+ }
+
+ // Symbolic links will have different canonical and absolute paths
+ if (!directory.getCanonicalPath().equals(directory.getAbsolutePath()))
+ {
+ return;
+ }
+ File[] files = directory.listFiles();
+ if (files == null)
+ {
+ throw new IOException("Error listing files for " + directory);
+ }
+ for (File file : files)
+ {
+ deleteRecursively(file);
+ }
+ }
+
+ // copied completely from guava to remove dependency on guava
+ /** Maximum loop count when creating temp directories. */
+ private static final int TEMP_DIR_ATTEMPTS = 10000;
+ public static File createTempDir()
+ {
+ File baseDir = new File(System.getProperty("java.io.tmpdir"));
+ String baseName = System.currentTimeMillis() + "-";
+
+ for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++)
+ {
+ File tempDir = new File(baseDir, baseName + counter);
+ if (tempDir.mkdir())
+ {
+ return tempDir;
+ }
+ }
+ throw new IllegalStateException("Failed to create directory within "
+ + TEMP_DIR_ATTEMPTS + " attempts (tried " + baseName + "0 to "
+ + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
+ }
+
+ }
+
+ static class PersistentByteBasedList extends ByteBasedList
+ {
+ private static int headerSize() { return (Integer.SIZE + Integer.SIZE + Integer.SIZE + Long.SIZE) / Byte.SIZE;}
+ protected static void store(ByteBasedList l, File f) throws IOException
+ {
+ /*
+ * write startOffset:bytesUsed:currentSize:lastModified
+ */
+ int hdrSize = headerSize();
+ ByteBuffer buf = ByteBuffer.allocate(hdrSize);
+
+ buf.putInt(l.startOffset);
+ buf.putInt(l.bytesUsed);
+ buf.putInt(l.currentSize);
+ buf.putLong(l.lastModified);
+ buf.flip();
+
+ /*
+ * note: could save this space by using Memory-Mapped I/O and directly writing to the MM buffer.
+ */
+ ByteBuffer offsetB = ByteBuffer.allocate((Integer.SIZE/Byte.SIZE) * 2 * l.currentSize);
+ IntBuffer iB = offsetB.asIntBuffer();
+ iB.put(l.offsetsArray, 0, l.currentSize * 2);
+
+ ByteBuffer bytesB = ByteBuffer.wrap(l.bytes, 0, l.bytesUsed);
+
+ ByteBuffer[] bufs = new ByteBuffer[] { buf, offsetB, bytesB};
+ FileOutputStream fos = new FileOutputStream(f);
+ try
+ {
+ FileChannel fc = fos.getChannel();
+ while (fc.write(bufs, 0, bufs.length) > 0) {
+ ;
+ }
+ }
+ finally
+ {
+ fos.close();
+ }
+ }
+
+ protected static void load(ByteBasedList l, File f) throws IOException
+ {
+ int hdr = headerSize();
+ FileInputStream fis = new FileInputStream(f);
+ try
+ {
+ FileChannel fc = fis.getChannel();
+ ByteBuffer buf0 = ByteBuffer.allocate(hdr);
+ while (buf0.hasRemaining()) {
+ fc.read(buf0);
+ }
+ buf0.flip();
+ l.startOffset = buf0.getInt();
+ l.bytesUsed = buf0.getInt();
+ l.currentSize = buf0.getInt();
+ l.lastModified = buf0.getLong();
+
+ /*
+ * note: could save this space by using Memory-Mapped I/O and directly writing to the MM buffer.
+ */
+ ByteBuffer offsetB = ByteBuffer.allocate((Integer.SIZE/Byte.SIZE) * 2 * l.currentSize);
+ ByteBuffer bytesB = ByteBuffer.allocate(l.bytesUsed);
+ ByteBuffer[] bufs = new ByteBuffer[] { offsetB, bytesB };
+ while (fc.read(bufs) > 0) {
+ ;
+ }
+
+ l.offsetsArray = new int[l.currentSize * 2];
+ offsetB.flip();
+ IntBuffer iB = offsetB.asIntBuffer();
+ iB.get(l.offsetsArray);
+ l.bytes = bytesB.array();
+ }
+ finally
+ {
+ fis.close();
+ }
+ }
+
+ File file;
+ SoftReference<ByteBasedList> memList;
+
+ protected PersistentByteBasedList(File file, ByteBasedList l)
+ {
+ super(file);
+ this.file = file;
+ memList = new SoftReference<ByteBasedList>(l);
+ }
+
+ protected PersistentByteBasedList(File file)
+ {
+ this(file, null);
+ }
+
+ @Override
+ protected void reset(int startOffset) throws HiveException {
+ throw new HiveException("Reset on PersistentByteBasedList not supported");
+ }
+
+ private ByteBasedList getList() throws HiveException
+ {
+ PTFPersistence.lock(lock.readLock());
+ try
+ {
+ ByteBasedList list = memList.get();
+ if (list == null)
+ {
+ try
+ {
+ list = new ByteBasedList(file);
+ load(list, file);
+ memList = new SoftReference<ByteBasedList>(list);
+ }
+ catch(Exception ie)
+ {
+ throw new RuntimeException(ie);
+ }
+ }
+ return list;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ File getFile() {
+ return file;
+ }
+
+ @Override
+ public int size() throws HiveException
+ {
+ return getList().size();
+ }
+
+ @Override
+ public void get(int i, Writable wObj) throws HiveException
+ {
+ getList().get(i, wObj);
+ }
+
+ @Override
+ public void append(Writable obj) throws HiveException
+ {
+ throw new UnsupportedOperationException("Cannot append to a Persisted List");
+ }
+
+ @Override
+ public Object get(int i, Deserializer deserializer, Writable wObj) throws HiveException
+ {
+ return getList().get(i, deserializer, wObj);
+ }
+
+ @Override
+ public void append(Object obj, ObjectInspector OI, Serializer serializer) throws HiveException
+ {
+ throw new UnsupportedOperationException("Cannot append to a Persisted List");
+ }
+
+ @Override
+ public Iterator<Writable> iterator(Writable wObj) throws HiveException
+ {
+ return getList().iterator(wObj);
+ }
+
+ @Override
+ public Iterator<Object> iterator(Deserializer deserializer, Writable wObj) throws HiveException
+ {
+ return getList().iterator(deserializer, wObj);
+ }
+
+ @Override
+ public void dump(StringBuilder bldr, Writable wObj) throws IOException, HiveException
+ {
+ getList().dump(bldr, wObj);
+ }
+
+ @Override
+ public void dump(StringBuilder bldr, Deserializer deserializer, Writable wObj) throws IOException, HiveException
+ {
+ getList().dump(bldr, deserializer, wObj);
+ }
+ }
+
+ public static class ByteBufferInputStream extends InputStream
+ {
+ ByteBuffer buffer;
+ int mark = -1;
+
+ public void intialize(ByteBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public void intialize(ByteBuffer buffer, int off, int len)
+ {
+ buffer = buffer.duplicate();
+ buffer.position(off);
+ buffer.limit(off + len);
+ this.buffer = buffer.slice();
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return buffer.hasRemaining() ? (buffer.get() & 0xff) : -1;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException
+ {
+ int remaining = buffer.remaining();
+ len= len <= remaining ? len : remaining;
+ buffer.get(b, off, len);
+ return len;
+ }
+
+ @Override
+ public boolean markSupported() { return true; }
+
+ @Override
+ public void mark(int readAheadLimit)
+ {
+ mark = buffer.position();
+ }
+
+ @Override
+ public void reset()
+ {
+ if ( mark == -1 ) {
+ throw new IllegalStateException();
+ }
+ buffer.position(mark);
+ mark = -1;
+ }
+ }
+
+ public static class ByteBufferOutputStream extends OutputStream
+ {
+ ByteBuffer buffer;
+
+ public void intialize(ByteBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public void intialize(ByteBuffer buffer, int off, int len)
+ {
+ buffer = buffer.duplicate();
+ buffer.position(off);
+ buffer.limit(off + len);
+ this.buffer = buffer.slice();
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ buffer.put((byte) b);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len)
+ {
+ int remaining = buffer.remaining();
+ if ( len > remaining )
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ buffer.put(b, off, len);
+ }
+ }
+
+ public static ThreadLocal<ByteArrayIS> bis = new ThreadLocal<ByteArrayIS>()
+ {
+ @Override
+ protected ByteArrayIS initialValue()
+ {
+ return new ByteArrayIS();
+ }
+ };
+
+ public static ThreadLocal<DataIStream> dis = new ThreadLocal<DataIStream>()
+ {
+ @Override
+ protected DataIStream initialValue()
+ {
+ return new DataIStream(bis.get());
+ }
+ };
+
+ public static ThreadLocal<ByteArrayOS> bos = new ThreadLocal<ByteArrayOS>()
+ {
+ @Override
+ protected ByteArrayOS initialValue()
+ {
+ return new ByteArrayOS();
+ }
+ };
+
+ public static ThreadLocal<DataOStream> dos = new ThreadLocal<DataOStream>()
+ {
+ @Override
+ protected DataOStream initialValue()
+ {
+ return new DataOStream(bos.get());
+ }
+ };
+
+
+ public static class DataIStream extends DataInputStream
+ {
+ public DataIStream(ByteArrayIS in)
+ {
+ super(in);
+ }
+
+ public ByteArrayIS getUnderlyingStream() { return (ByteArrayIS) in; }
+ }
+
+ public static class DataOStream extends DataOutputStream
+ {
+ public DataOStream(ByteArrayOS out)
+ {
+ super(out);
+ }
+
+ public ByteArrayOS getUnderlyingStream() { return (ByteArrayOS) out; }
+ }
+
+ public static class ByteArrayOS extends ByteArrayOutputStream
+ {
+ public ByteArrayOS() { super(); }
+ public ByteArrayOS(int size) { super(size); }
+ public final byte[] bytearray() { return buf; }
+ public final int len() { return count; }
+
+ }
+
+ public static class ByteArrayIS extends ByteArrayInputStream
+ {
+ public ByteArrayIS() { super(new byte[0]); }
+ public final byte[] bytearray() { return buf; }
+ public final void setBuffer(byte[] buf, int offset, int len)
+ {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + len, buf.length);
+ this.mark = offset;
+ }
+
+ }
+
+ public static void lock(Lock lock) throws HiveException
+ {
+ try
+ {
+ lock.lockInterruptibly();
+
+ }
+ catch(InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ throw new HiveException("Operation interrupted", ie);
+ }
+ }
+
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.beans.BeanInfo;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PersistenceDelegate;
+import java.beans.PropertyDescriptor;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.antlr.runtime.CommonToken;
+import org.antlr.runtime.tree.BaseTree;
+import org.antlr.runtime.tree.CommonTree;
+import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+public class PTFUtils {
+
+ public static String toString(List<?> col)
+ {
+ StringBuilder buf = new StringBuilder();
+ buf.append("[");
+ boolean first = true;
+ for (Object o : col)
+ {
+ if (first) {
+ first = false;
+ } else {
+ buf.append(", ");
+ }
+ buf.append(o.toString());
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public static String toString(Map<?, ?> col)
+ {
+ StringBuilder buf = new StringBuilder();
+ buf.append("[");
+ boolean first = true;
+ for (Map.Entry<?, ?> o : col.entrySet())
+ {
+ if (first) {
+ first = false;
+ } else {
+ buf.append(", ");
+ }
+ buf.append(o.getKey().toString()).append(" : ")
+ .append(o.getValue().toString());
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public static String unescapeQueryString(String qry)
+ {
+ qry = qry.replace("\\\"", "\"");
+ qry = qry.replace("\\'", "'");
+ return qry;
+ }
+
+ public static class ReverseIterator<T> implements Iterator<T>
+ {
+ Stack<T> stack;
+
+ public ReverseIterator(Iterator<T> it)
+ {
+ stack = new Stack<T>();
+ while (it.hasNext())
+ {
+ stack.push(it.next());
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !stack.isEmpty();
+ }
+
+ @Override
+ public T next()
+ {
+ return stack.pop();
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static abstract class Predicate<T>
+ {
+ public abstract boolean apply(T obj);
+ };
+
+
+
+ /*
+ * serialization functions
+ */
+ public static void serialize(OutputStream out, Object o)
+ {
+ XMLEncoder e = new XMLEncoder(out);
+ e.setExceptionListener(new EL());
+ PTFUtils.addPersistenceDelegates(e);
+ e.writeObject(o);
+ e.close();
+ }
+
+ public static Object deserialize(InputStream in1)
+ {
+ XMLDecoder d = null;
+ try
+ {
+ d = new XMLDecoder(in1, null, null);
+ return d.readObject();
+ }
+ finally
+ {
+ if (null != d)
+ {
+ d.close();
+ }
+ }
+ }
+
+ public static void addPersistenceDelegates(XMLEncoder e)
+ {
+ addAntlrPersistenceDelegates(e);
+ addHivePersistenceDelegates(e);
+ addEnumDelegates(e);
+ }
+
+ public static void addEnumDelegates(XMLEncoder e)
+ {
+ e.setPersistenceDelegate(Direction.class, new EnumDelegate());
+ }
+
+ public static void addAntlrPersistenceDelegates(XMLEncoder e)
+ {
+ e.setPersistenceDelegate(ASTNode.class, new PersistenceDelegate()
+ {
+
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out)
+ {
+ return new Expression(oldInstance, oldInstance.getClass(),
+ "new", new Object[]
+ { ((ASTNode) oldInstance).getToken() });
+ }
+ });
+ e.setPersistenceDelegate(CommonTree.class, new PersistenceDelegate()
+ {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out)
+ {
+ return new Expression(oldInstance, oldInstance.getClass(),
+ "new", new Object[]
+ { ((CommonTree) oldInstance).getToken() });
+ }
+ });
+ e.setPersistenceDelegate(BaseTree.class, new PersistenceDelegate()
+ {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out)
+ {
+ return new Expression(oldInstance, oldInstance.getClass(),
+ "new", new Object[]
+ {});
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected void initialize(Class type, Object oldInstance,
+ Object newInstance, Encoder out)
+ {
+ super.initialize(type, oldInstance, newInstance, out);
+
+ BaseTree t = (BaseTree) oldInstance;
+
+ for (int i = 0; i < t.getChildCount(); i++)
+ {
+ out.writeStatement(new Statement(oldInstance, "addChild",
+ new Object[]
+ { t.getChild(i) }));
+ }
+ }
+ });
+ e.setPersistenceDelegate(CommonToken.class, new PersistenceDelegate()
+ {
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out)
+ {
+ return new Expression(oldInstance, oldInstance.getClass(),
+ "new", new Object[]
+ { ((CommonToken) oldInstance).getType(),
+ ((CommonToken) oldInstance).getText() });
+ }
+ });
+ }
+
+ public static void addHivePersistenceDelegates(XMLEncoder e)
+ {
+ e.setPersistenceDelegate(PrimitiveTypeInfo.class,
+ new PersistenceDelegate()
+ {
+ @Override
+ protected Expression instantiate(Object oldInstance,
+ Encoder out)
+ {
+ return new Expression(oldInstance,
+ TypeInfoFactory.class, "getPrimitiveTypeInfo",
+ new Object[]
+ { ((PrimitiveTypeInfo) oldInstance)
+ .getTypeName() });
+ }
+ });
+ }
+
+ static class EL implements ExceptionListener
+ {
+ public void exceptionThrown(Exception e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException("Cannot serialize the query plan", e);
+ }
+ }
+
+ public static void makeTransient(Class<?> beanClass, String pdName)
+ {
+ BeanInfo info;
+ try
+ {
+ info = Introspector.getBeanInfo(beanClass);
+ PropertyDescriptor[] propertyDescriptors = info
+ .getPropertyDescriptors();
+ for (int i = 0; i < propertyDescriptors.length; ++i)
+ {
+ PropertyDescriptor pd = propertyDescriptors[i];
+ if (pd.getName().equals(pdName))
+ {
+ pd.setValue("transient", Boolean.TRUE);
+ }
+ }
+ }
+ catch (IntrospectionException ie)
+ {
+ throw new RuntimeException(ie);
+ }
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+@Documented
+public @interface PartitionTableFunctionDescription
+{
+ Description description ();
+
+ /**
+ * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function.
+ */
+ boolean isInternal() default false;
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+@Documented
+public @interface WindowFunctionDescription
+{
+ Description description ();
+ /**
+ * controls whether this function can be applied to a Window.
+ * <p>
+ * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows.
+ * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on.
+ * For ranking defining a set of rows for every row makes no sense.
+ * <p>
+ * All other UDAFs can be computed for a Window.
+ */
+ boolean supportsWindow() default true;
+ /**
+ * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value.
+ * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the
+ * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value
+ * for all the rows.
+ */
+ boolean pivotResult() default false;
+}
+
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+
+@SuppressWarnings("deprecation")
+public class WindowFunctionInfo
+{
+ boolean supportsWindow = true;
+ boolean pivotResult = false;
+ FunctionInfo fInfo;
+
+ WindowFunctionInfo(FunctionInfo fInfo)
+ {
+ assert fInfo.isGenericUDAF();
+ this.fInfo = fInfo;
+ Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass();
+ WindowFunctionDescription def = wfnCls.getAnnotation(WindowFunctionDescription.class);
+ if ( def != null)
+ {
+ supportsWindow = def.supportsWindow();
+ pivotResult = def.pivotResult();
+ }
+ }
+
+ public boolean isSupportsWindow()
+ {
+ return supportsWindow;
+ }
+
+ public boolean isPivotResult()
+ {
+ return pivotResult;
+ }
+
+ public FunctionInfo getfInfo()
+ {
+ return fInfo;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java Tue Apr 2 14:16:34 2013
@@ -47,7 +47,8 @@ public class LineageInfo implements Seri
* set operations like union on columns on other tables
* e.g. T2.c1 = T1.c1 + T3.c1.
* 4. SCRIPT - Indicates that the column is derived from the output
- * of a user script through a TRANSFORM, MAP or REDUCE syntax.
+ * of a user script through a TRANSFORM, MAP or REDUCE syntax
+ * or from the output of a PTF chain execution.
*/
public static enum DependencyType {
SIMPLE, EXPRESSION, SCRIPT
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Tue Apr 2 14:16:34 2013
@@ -23,19 +23,19 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -114,6 +114,9 @@ public class ColumnPruner implements Tra
opRules.put(new RuleRegExp("R9",
LateralViewForwardOperator.getOperatorName() + "%"),
ColumnPrunerProcFactory.getLateralViewForwardProc());
+ opRules.put(new RuleRegExp("R10",
+ PTFOperator.getOperatorName() + "%"),
+ ColumnPrunerProcFactory.getPTFProc());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Tue Apr 2 14:16:34 2013
@@ -19,8 +19,10 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.La
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
@@ -62,11 +66,21 @@ import org.apache.hadoop.hive.ql.plan.Gr
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
* Factory for generating the different node processors used by ColumnPruner.
@@ -148,6 +162,170 @@ public final class ColumnPrunerProcFacto
}
/**
+ * - Pruning can only be done for Windowing. PTFs are black boxes,
+ * we assume all columns are needed.
+ * - add column names referenced in WindowFn args and in WindowFn expressions
+ * to the pruned list of the child Select Op.
+ * - Prune the Column names & types serde properties in each of the Shapes in the PTF Chain:
+ * - the InputDef's output shape
+ * - Window Tabl Functions: window output shape & output shape.
+ * - Why is pruning the Column names & types in the serde properties enough?
+ * - because during runtime we rebuild the OIs using these properties.
+ * - finally we set the prunedColList on the ColumnPrunerContx;
+ * and update the RR & signature on the PTFOp.
+ */
+ public static class ColumnPrunerPTFProc implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+
+ PTFOperator op = (PTFOperator) nd;
+ PTFDesc conf = op.getConf();
+ //Since we cannot know what columns will be needed by a PTF chain,
+ //we do not prune columns on PTFOperator for PTF chains.
+ if (!conf.forWindowing()) {
+ return getDefaultProc().process(nd, stack, ctx, nodeOutputs);
+ }
+
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
+ WindowTableFunctionDef def = (WindowTableFunctionDef) conf.getFuncDef();
+ ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
+
+ List<String> prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0));
+ //we create a copy of prunedCols to create a list of pruned columns for PTFOperator
+ prunedCols = new ArrayList<String>(prunedCols);
+ prunedColumnsList(prunedCols, def);
+ setSerdePropsOfShape(def.getInput().getOutputShape(), prunedCols);
+ setSerdePropsOfShape(def.getOutputFromWdwFnProcessing(), prunedCols);
+ setSerdePropsOfShape(def.getOutputShape(), prunedCols);
+
+ RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
+ RowResolver newRR = buildPrunedRR(prunedCols, oldRR, sig);
+ cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def));
+ cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newRR);
+ op.getSchema().setSignature(sig);
+ return null;
+ }
+
+ private static RowResolver buildPrunedRR(List<String> prunedCols,
+ RowResolver oldRR, ArrayList<ColumnInfo> sig) throws SemanticException{
+ RowResolver newRR = new RowResolver();
+ HashSet<String> prunedColsSet = new HashSet<String>(prunedCols);
+ for(ColumnInfo cInfo : oldRR.getRowSchema().getSignature()) {
+ if ( prunedColsSet.contains(cInfo.getInternalName())) {
+ String[] nm = oldRR.reverseLookup(cInfo.getInternalName());
+ newRR.put(nm[0], nm[1], cInfo);
+ sig.add(cInfo);
+ }
+ }
+ return newRR;
+ }
+
+ /*
+ * add any input columns referenced in WindowFn args or expressions.
+ */
+ private void prunedColumnsList(List<String> prunedCols, WindowTableFunctionDef tDef) {
+ if ( tDef.getWindowFunctions() != null ) {
+ for(WindowFunctionDef wDef : tDef.getWindowFunctions() ) {
+ if ( wDef.getArgs() == null) {
+ continue;
+ }
+ for(PTFExpressionDef arg : wDef.getArgs()) {
+ ExprNodeDesc exprNode = arg.getExprNode();
+ Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+ }
+ }
+ }
+ if ( tDef.getWindowExpressions() != null ) {
+ for(WindowExpressionDef expr : tDef.getWindowExpressions()) {
+ ExprNodeDesc exprNode = expr.getExprNode();
+ Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+ }
+ }
+ if(tDef.getPartition() != null){
+ for(PTFExpressionDef col : tDef.getPartition().getExpressions()){
+ ExprNodeDesc exprNode = col.getExprNode();
+ Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+ }
+ }
+ if(tDef.getOrder() != null){
+ for(PTFExpressionDef col : tDef.getOrder().getExpressions()){
+ ExprNodeDesc exprNode = col.getExprNode();
+ Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+ }
+ }
+ }
+
+ private List<String> getLowerCasePrunedCols(List<String> prunedCols){
+ List<String> lowerCasePrunedCols = new ArrayList<String>();
+ for (String col : prunedCols) {
+ lowerCasePrunedCols.add(col.toLowerCase());
+ }
+ return lowerCasePrunedCols;
+ }
+
+ /*
+ * reconstruct Column names & types list based on the prunedCols list.
+ */
+ private void setSerdePropsOfShape(ShapeDetails shp, List<String> prunedCols) {
+ List<String> columnNames = Arrays.asList(shp.getSerdeProps().get(
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS).split(","));
+ List<TypeInfo> columnTypes = TypeInfoUtils
+ .getTypeInfosFromTypeString(shp.getSerdeProps().get(
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES));
+ /*
+ * fieldNames in OI are lower-cased. So we compare lower cased names for now.
+ */
+ prunedCols = getLowerCasePrunedCols(prunedCols);
+
+ StringBuilder cNames = new StringBuilder();
+ StringBuilder cTypes = new StringBuilder();
+
+ boolean addComma = false;
+ for(int i=0; i < columnNames.size(); i++) {
+ if ( prunedCols.contains(columnNames.get(i)) ) {
+ cNames.append(addComma ? "," : "");
+ cTypes.append(addComma ? "," : "");
+ cNames.append(columnNames.get(i));
+ cTypes.append(columnTypes.get(i));
+ addComma = true;
+ }
+ }
+ shp.getSerdeProps().put(
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, cNames.toString());
+ shp.getSerdeProps().put(
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, cTypes.toString());
+ }
+
+ /*
+ * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs
+ * the returned list is set as the prunedList needed by the PTFOp.
+ */
+ private ArrayList<String> prunedInputList(List<String> prunedCols,
+ WindowTableFunctionDef tDef) {
+ ArrayList<String> prunedInputCols = new ArrayList<String>();
+
+ StructObjectInspector OI = tDef.getInput().getOutputShape().getOI();
+ for(StructField f : OI.getAllStructFieldRefs()) {
+ String fName = f.getFieldName();
+ if ( prunedCols.contains(fName)) {
+ prunedInputCols.add(fName);
+ }
+ }
+
+ return prunedInputCols;
+ }
+ }
+
+ /**
+ * Factory method to get the ColumnPrunerGroupByProc class.
+ *
+ * @return ColumnPrunerGroupByProc
+ */
+ public static ColumnPrunerPTFProc getPTFProc() {
+ return new ColumnPrunerPTFProc();
+ }
+
+ /**
* The Default Node Processor for Column Pruning.
*/
public static class ColumnPrunerDefaultProc implements NodeProcessor {
@@ -285,6 +463,39 @@ public final class ColumnPrunerProcFacto
}
Collections.sort(colLists);
pruneReduceSinkOperator(flags, op, cppCtx);
+ } else if ((childOperators.size() == 1)
+ && (childOperators.get(0) instanceof ExtractOperator )
+ && (childOperators.get(0).getChildOperators().size() == 1)
+ && (childOperators.get(0).getChildOperators().get(0) instanceof PTFOperator )
+ && ((PTFOperator)childOperators.get(0).
+ getChildOperators().get(0)).getConf().forWindowing() ) {
+
+ /*
+ * For RS that are followed by Extract & PTFOp for windowing
+ * - do the same thing as above. Reconstruct ValueColumn list based on what is required
+ * by the PTFOp.
+ */
+
+ assert parentOperators.size() == 1;
+
+ PTFOperator ptfOp = (PTFOperator) childOperators.get(0).getChildOperators().get(0);
+ List<String> childCols = cppCtx.getPrunedColList(ptfOp);
+ boolean[] flags = new boolean[conf.getValueCols().size()];
+ for (int i = 0; i < flags.length; i++) {
+ flags[i] = false;
+ }
+ if (childCols != null && childCols.size() > 0) {
+ ArrayList<String> outColNames = op.getConf().getOutputValueColumnNames();
+ for(int i=0; i < outColNames.size(); i++ ) {
+ if ( childCols.contains(outColNames.get(i))) {
+ ExprNodeDesc exprNode = op.getConf().getValueCols().get(i);
+ flags[i] = true;
+ Utilities.mergeUniqElems(colLists, exprNode.getCols());
+ }
+ }
+ }
+ Collections.sort(colLists);
+ pruneReduceSinkOperator(flags, op, cppCtx);
} else {
// Reduce Sink contains the columns needed - no need to aggregate from
// children
@@ -831,4 +1042,4 @@ public final class ColumnPrunerProcFacto
return new ColumnPrunerMapJoinProc();
}
-}
+}
\ No newline at end of file
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java Tue Apr 2 14:16:34 2013
@@ -22,17 +22,17 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UDTFOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -82,6 +82,8 @@ public class Generator implements Transf
OpProcFactory.getReduceSinkProc());
opRules.put(new RuleRegExp("R9", LateralViewJoinOperator.getOperatorName() + "%"),
OpProcFactory.getLateralViewJoinProc());
+ opRules.put(new RuleRegExp("R10", PTFOperator.getOperatorName() + "%"),
+ OpProcFactory.getTransformProc());
// The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java Tue Apr 2 14:16:34 2013
@@ -37,10 +37,6 @@ public class ASTNode extends CommonTree
public ASTNode() {
}
- public ASTNode(ASTNode copy){
- super(copy);
- }
-
/**
* Constructor.
*
@@ -51,6 +47,16 @@ public class ASTNode extends CommonTree
super(t);
}
+ public ASTNode(ASTNode node) {
+ super(node);
+ this.origin = node.origin;
+ }
+
+ @Override
+ public Tree dupNode() {
+ return new ASTNode(this);
+ }
+
/*
* (non-Javadoc)
*
@@ -95,12 +101,6 @@ public class ASTNode extends CommonTree
this.origin = origin;
}
- @Override
- public Tree dupNode() {
-
- return new ASTNode(this);
- }
-
public String dump() {
StringBuilder sb = new StringBuilder();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Tue Apr 2 14:16:34 2013
@@ -139,7 +139,7 @@ fromSource
@init { gParent.msgs.push("from source"); }
@after { gParent.msgs.pop(); }
:
- (tableSource | subQuerySource) (lateralView^)*
+ ((Identifier LPAREN)=> partitionedTableFunction | tableSource | subQuerySource) (lateralView^)*
;
tableBucketSample
@@ -202,6 +202,38 @@ subQuerySource
LPAREN queryStatementExpression RPAREN identifier -> ^(TOK_SUBQUERY queryStatementExpression identifier)
;
+//---------------------- Rules for parsing PTF clauses -----------------------------
+partitioningSpec
+@init { gParent.msgs.push("partitioningSpec clause"); }
+@after { gParent.msgs.pop(); }
+ :
+ partitionByClause orderByClause? -> ^(TOK_PARTITIONINGSPEC partitionByClause orderByClause?) |
+ orderByClause -> ^(TOK_PARTITIONINGSPEC orderByClause) |
+ distributeByClause sortByClause? -> ^(TOK_PARTITIONINGSPEC distributeByClause sortByClause?) |
+ sortByClause -> ^(TOK_PARTITIONINGSPEC sortByClause) |
+ clusterByClause -> ^(TOK_PARTITIONINGSPEC clusterByClause)
+ ;
+
+partitionTableFunctionSource
+@init { gParent.msgs.push("partitionTableFunctionSource clause"); }
+@after { gParent.msgs.pop(); }
+ :
+ subQuerySource |
+ tableSource |
+ partitionedTableFunction
+ ;
+
+partitionedTableFunction
+@init { gParent.msgs.push("ptf clause"); }
+@after { gParent.msgs.pop(); }
+ :
+ name=Identifier
+ LPAREN KW_ON ptfsrc=partitionTableFunctionSource partitioningSpec?
+ ((Identifier LPAREN expression RPAREN ) => Identifier LPAREN expression RPAREN ( COMMA Identifier LPAREN expression RPAREN)*)?
+ RPAREN alias=Identifier?
+ -> ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+ ;
+
//----------------------- Rules for parsing whereClause -----------------------------
// where a=b and ...
whereClause
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g Tue Apr 2 14:16:34 2013
@@ -242,6 +242,14 @@ KW_ROLLUP: 'ROLLUP';
KW_CUBE: 'CUBE';
KW_DIRECTORIES: 'DIRECTORIES';
KW_FOR: 'FOR';
+KW_WINDOW: 'WINDOW';
+KW_UNBOUNDED: 'UNBOUNDED';
+KW_PRECEDING: 'PRECEDING';
+KW_FOLLOWING: 'FOLLOWING';
+KW_CURRENT: 'CURRENT';
+KW_LESS: 'LESS';
+KW_MORE: 'MORE';
+KW_OVER: 'OVER';
KW_GROUPING: 'GROUPING';
KW_SETS: 'SETS';
KW_TRUNCATE: 'TRUNCATE';
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Tue Apr 2 14:16:34 2013
@@ -288,6 +288,12 @@ TOK_SKEWED_LOCATIONS;
TOK_SKEWED_LOCATION_LIST;
TOK_SKEWED_LOCATION_MAP;
TOK_STOREDASDIRS;
+TOK_PARTITIONINGSPEC;
+TOK_PTBLFUNCTION;
+TOK_WINDOWDEF;
+TOK_WINDOWSPEC;
+TOK_WINDOWVALUES;
+TOK_WINDOWRANGE;
TOK_IGNOREPROTECTION;
}
@@ -1792,9 +1798,10 @@ regular_body
clusterByClause?
distributeByClause?
sortByClause?
+ window_clause?
limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT insertClause
selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
- distributeByClause? sortByClause? limitClause?))
+ distributeByClause? sortByClause? window_clause? limitClause?))
|
selectStatement
;
@@ -1810,9 +1817,10 @@ selectStatement
clusterByClause?
distributeByClause?
sortByClause?
+ window_clause?
limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
- distributeByClause? sortByClause? limitClause?))
+ distributeByClause? sortByClause? window_clause? limitClause?))
;
@@ -1827,9 +1835,10 @@ body
clusterByClause?
distributeByClause?
sortByClause?
+ window_clause?
limitClause? -> ^(TOK_INSERT insertClause?
selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
- distributeByClause? sortByClause? limitClause?)
+ distributeByClause? sortByClause? window_clause? limitClause?)
|
selectClause
whereClause?
@@ -1839,9 +1848,10 @@ body
clusterByClause?
distributeByClause?
sortByClause?
+ window_clause?
limitClause? -> ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
- distributeByClause? sortByClause? limitClause?)
+ distributeByClause? sortByClause? window_clause? limitClause?)
;
insertClause
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g Tue Apr 2 14:16:34 2013
@@ -124,7 +124,18 @@ clusterByClause
|
KW_CLUSTER KW_BY
expression
- ( COMMA expression )* -> ^(TOK_CLUSTERBY expression+)
+ ( (COMMA)=>COMMA expression )* -> ^(TOK_CLUSTERBY expression+)
+ ;
+
+partitionByClause
+@init { gParent.msgs.push("partition by clause"); }
+@after { gParent.msgs.pop(); }
+ :
+ KW_PARTITION KW_BY
+ LPAREN expression (COMMA expression)* RPAREN -> ^(TOK_DISTRIBUTEBY expression+)
+ |
+ KW_PARTITION KW_BY
+ expression ((COMMA)=> COMMA expression)* -> ^(TOK_DISTRIBUTEBY expression+)
;
distributeByClause
@@ -135,7 +146,7 @@ distributeByClause
LPAREN expression (COMMA expression)* RPAREN -> ^(TOK_DISTRIBUTEBY expression+)
|
KW_DISTRIBUTE KW_BY
- expression (COMMA expression)* -> ^(TOK_DISTRIBUTEBY expression+)
+ expression ((COMMA)=> COMMA expression)* -> ^(TOK_DISTRIBUTEBY expression+)
;
sortByClause
@@ -148,7 +159,7 @@ sortByClause
|
KW_SORT KW_BY
columnRefOrder
- ( COMMA columnRefOrder)* -> ^(TOK_SORTBY columnRefOrder+)
+ ( (COMMA)=> COMMA columnRefOrder)* -> ^(TOK_SORTBY columnRefOrder+)
;
// fun(par1, par2, par3)
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+
+public class PTFInvocationSpec {
+
+ PartitionedTableFunctionSpec function;
+
+ public PartitionedTableFunctionSpec getFunction() {
+ return function;
+ }
+
+ public void setFunction(PartitionedTableFunctionSpec function) {
+ this.function = function;
+ }
+
+ public PartitionedTableFunctionSpec getStartOfChain() {
+ return function == null ? null : function.getStartOfChain();
+ }
+
+ public String getQueryInputName() {
+ return function == null ? null : function.getQueryInputName();
+ }
+
+ public PTFQueryInputSpec getQueryInput() {
+ return function == null ? null : function.getQueryInput();
+ }
+
+ /*
+ * A PTF Input represents the input to a PTF Function. An Input can be a Hive SubQuery or Table
+ * or another PTF Function. An Input instance captures the ASTNode that this instance was created from.
+ */
+ public abstract static class PTFInputSpec {
+ ASTNode astNode;
+
+ public ASTNode getAstNode() {
+ return astNode;
+ }
+
+ public void setAstNode(ASTNode astNode) {
+ this.astNode = astNode;
+ }
+
+ public abstract PTFInputSpec getInput();
+
+ public abstract String getQueryInputName();
+ public abstract PTFQueryInputSpec getQueryInput();
+ }
+
+ public static enum PTFQueryInputType {
+ TABLE,
+ SUBQUERY,
+ PTFCOMPONENT,
+ WINDOWING;
+ }
+
+ /*
+ * A PTF input that represents a source in the overall Query. This could be a Table or a SubQuery.
+ * If a PTF chain requires execution by multiple PTF Operators;
+ * then the original Invocation object is decomposed into a set of Component Invocations.
+ * Every component Invocation but the first one ends in a PTFQueryInputSpec instance.
+ * During the construction of the Operator plan a PTFQueryInputSpec object in the chain implies connect the PTF Operator to the
+ * 'input' i.e. has been generated so far.
+ */
+ public static class PTFQueryInputSpec extends PTFInputSpec {
+ String source;
+ PTFQueryInputType type;
+
+ public String getSource() {
+ return source;
+ }
+ public void setSource(String source) {
+ this.source = source;
+ }
+ public PTFQueryInputType getType() {
+ return type;
+ }
+ public void setType(PTFQueryInputType type) {
+ this.type = type;
+ }
+
+ @Override
+ public PTFInputSpec getInput() {
+ return null;
+ }
+
+ @Override
+ public String getQueryInputName() {
+ return getSource();
+ }
+ @Override
+ public PTFQueryInputSpec getQueryInput() {
+ return this;
+ }
+ }
+
+ /*
+ * Represents a PTF Invocation. Captures:
+ * - function name and alias
+ * - the Partitioning details about its input
+ * - its arguments. The ASTNodes representing the arguments are captured here.
+ * - a reference to its Input
+ */
+ public static class PartitionedTableFunctionSpec extends PTFInputSpec {
+ String name;
+ String alias;
+ ArrayList<ASTNode> args;
+ PartitioningSpec partitioning;
+ PTFInputSpec input;
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getAlias() {
+ return alias;
+ }
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+ public ArrayList<ASTNode> getArgs() {
+ return args;
+ }
+ public void setArgs(ArrayList<ASTNode> args) {
+ this.args = args;
+ }
+ public PartitioningSpec getPartitioning() {
+ return partitioning;
+ }
+ public void setPartitioning(PartitioningSpec partitioning) {
+ this.partitioning = partitioning;
+ }
+ @Override
+ public PTFInputSpec getInput() {
+ return input;
+ }
+ public void setInput(PTFInputSpec input) {
+ this.input = input;
+ }
+ public PartitionSpec getPartition() {
+ return getPartitioning() == null ? null : getPartitioning().getPartSpec();
+ }
+ public void setPartition(PartitionSpec partSpec) {
+ partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+ partitioning.setPartSpec(partSpec);
+ }
+ public OrderSpec getOrder() {
+ return getPartitioning() == null ? null : getPartitioning().getOrderSpec();
+ }
+ public void setOrder(OrderSpec orderSpec) {
+ partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+ partitioning.setOrderSpec(orderSpec);
+ }
+ public void addArg(ASTNode arg)
+ {
+ args = args == null ? new ArrayList<ASTNode>() : args;
+ args.add(arg);
+ }
+
+ public PartitionedTableFunctionSpec getStartOfChain() {
+ if ( input instanceof PartitionedTableFunctionSpec ) {
+ return ((PartitionedTableFunctionSpec)input).getStartOfChain();
+ }
+ return this;
+ }
+ @Override
+ public String getQueryInputName() {
+ return input.getQueryInputName();
+ }
+ @Override
+ public PTFQueryInputSpec getQueryInput() {
+ return input.getQueryInput();
+ }
+ }
+
+ /*
+ * Captures how the Input to a PTF Function should be partitioned and
+ * ordered. Refers to a /Partition/ and /Order/ instance.
+ */
+ public static class PartitioningSpec {
+ PartitionSpec partSpec;
+ OrderSpec orderSpec;
+ public PartitionSpec getPartSpec() {
+ return partSpec;
+ }
+ public void setPartSpec(PartitionSpec partSpec) {
+ this.partSpec = partSpec;
+ }
+ public OrderSpec getOrderSpec() {
+ return orderSpec;
+ }
+ public void setOrderSpec(OrderSpec orderSpec) {
+ this.orderSpec = orderSpec;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((orderSpec == null) ? 0 : orderSpec.hashCode());
+ result = prime * result + ((partSpec == null) ? 0 : partSpec.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PartitioningSpec other = (PartitioningSpec) obj;
+ if (orderSpec == null) {
+ if (other.orderSpec != null) {
+ return false;
+ }
+ } else if (!orderSpec.equals(other.orderSpec)) {
+ return false;
+ }
+ if (partSpec == null) {
+ if (other.partSpec != null) {
+ return false;
+ }
+ } else if (!partSpec.equals(other.partSpec)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /*
+ * Captures how an Input should be Partitioned. This is captured as a
+ * list of ASTNodes that are the expressions in the Distribute/Cluster
+ * by clause specifying the partitioning applied for a PTF invocation.
+ */
+ public static class PartitionSpec {
+ ArrayList<PartitionExpression> expressions;
+
+ public ArrayList<PartitionExpression> getExpressions()
+ {
+ return expressions;
+ }
+
+ public void setExpressions(ArrayList<PartitionExpression> columns)
+ {
+ this.expressions = columns;
+ }
+
+ public void addExpression(PartitionExpression c)
+ {
+ expressions = expressions == null ? new ArrayList<PartitionExpression>() : expressions;
+ expressions.add(c);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((expressions == null) ? 0 : expressions.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PartitionSpec other = (PartitionSpec) obj;
+ if (expressions == null)
+ {
+ if (other.expressions != null) {
+ return false;
+ }
+ }
+ else if (!expressions.equals(other.expressions)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("partitionColumns=%s",PTFUtils.toString(expressions));
+ }
+ }
+
+ public static class PartitionExpression
+ {
+ ASTNode expression;
+
+ public PartitionExpression() {}
+
+ public PartitionExpression(PartitionExpression peSpec)
+ {
+ expression = peSpec.getExpression();
+ }
+
+ public ASTNode getExpression() {
+ return expression;
+ }
+
+ public void setExpression(ASTNode expression) {
+ this.expression = expression;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((expression == null) ? 0 : expression.toStringTree().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!getClass().isAssignableFrom(obj.getClass())) {
+ return false;
+ }
+ PartitionExpression other = (PartitionExpression) obj;
+ if (expression == null) {
+ if (other.expression != null) {
+ return false;
+ }
+ } else if (!expression.toStringTree().equals(other.expression.toStringTree())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return expression.toStringTree();
+ }
+
+ }
+
+ /*
+ * Captures how the Input should be Ordered. This is captured as a list
+ * of ASTNodes that are the expressions in the Sort By clause in a
+ * PTF invocation.
+ */
+ public static class OrderSpec
+ {
+ ArrayList<OrderExpression> expressions;
+
+ public OrderSpec() {}
+
+ public OrderSpec(PartitionSpec pSpec)
+ {
+ for(PartitionExpression peSpec : pSpec.getExpressions())
+ {
+ addExpression(new OrderExpression(peSpec));
+ }
+ }
+
+ public ArrayList<OrderExpression> getExpressions()
+ {
+ return expressions;
+ }
+
+ public void setExpressions(ArrayList<OrderExpression> columns)
+ {
+ this.expressions = columns;
+ }
+
+ public void addExpression(OrderExpression c)
+ {
+ expressions = expressions == null ? new ArrayList<OrderExpression>() : expressions;
+ expressions.add(c);
+ }
+
+ protected boolean isPrefixedBy(PartitionSpec pSpec) {
+ if ( pSpec == null || pSpec.getExpressions() == null) {
+ return true;
+ }
+
+ int pExprCnt = pSpec.getExpressions().size();
+ int exprCnt = getExpressions() == null ? 0 : getExpressions().size();
+
+ if ( exprCnt < pExprCnt ) {
+ return false;
+ }
+
+ for(int i=0; i < pExprCnt; i++) {
+ if ( !pSpec.getExpressions().get(i).equals(getExpressions().get(i)) ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected void prefixBy(PartitionSpec pSpec) {
+ if ( pSpec == null || pSpec.getExpressions() == null) {
+ return;
+ }
+ if ( expressions == null ) {
+ expressions = new ArrayList<PTFInvocationSpec.OrderExpression>();
+ }
+ for(int i = pSpec.getExpressions().size() - 1; i >= 0; i--) {
+ expressions.add(0, new OrderExpression(pSpec.getExpressions().get(i)));
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((expressions == null) ? 0 : expressions.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ OrderSpec other = (OrderSpec) obj;
+ if (expressions == null)
+ {
+ if (other.expressions != null) {
+ return false;
+ }
+ }
+ else if (!expressions.equals(other.expressions)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("orderColumns=%s",PTFUtils.toString(expressions));
+ }
+ }
+
+ public static enum Order
+ {
+ ASC,
+ DESC;
+ }
+
+ public static class OrderExpression extends PartitionExpression
+ {
+ Order order;
+
+ public OrderExpression() {}
+
+ public OrderExpression(PartitionExpression peSpec)
+ {
+ super(peSpec);
+ order = Order.ASC;
+ }
+
+ public Order getOrder()
+ {
+ return order;
+ }
+
+ public void setOrder(Order order)
+ {
+ this.order = order;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((order == null) ? 0 : order.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ OrderExpression other = (OrderExpression) obj;
+ if (order != other.order) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s %s", super.toString(), order);
+ }
+ }
+
+}