You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC

svn commit: r1463556 [4/15] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,1124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.ref.SoftReference;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * contains all the classes to support persisting a PTF partition,
+ */
+public class PTFPersistence {
+
+  @SuppressWarnings("unchecked")
+  public static ByteBasedList createList(String clsName, int capacity) throws HiveException
+  {
+    try
+    {
+      Class<? extends ByteBasedList> cls = (Class<? extends ByteBasedList>) Class.forName(clsName);
+      Constructor<? extends ByteBasedList> cons = cls.getConstructor(Integer.TYPE);
+      return cons.newInstance(capacity);
+    }
+    catch(Exception e)
+    {
+      throw new HiveException(e);
+    }
+  }
+
+  public static class ByteBasedList
+  {
+    int startOffset;
+
+    /*
+     * (offset,size) of Writables.
+     * entry i at position i << 1
+     * this array is resizable.
+     */
+    int[] offsetsArray;
+
+    /*
+     * contains actual bytes of Writables.
+     * not resizable
+     */
+    byte[] bytes;
+    int bytesUsed;
+
+    int currentSize;
+    ReentrantReadWriteLock lock;
+    volatile long lastModified;
+
+
+    public ByteBasedList(int startOffset, int capacity)
+    {
+      this.startOffset = startOffset;
+      bytes = new byte[capacity];
+      offsetsArray = new int[INCREMENT_SIZE];
+      bytesUsed = 0;
+      currentSize = 0;
+      lock = new ReentrantReadWriteLock();
+      lastModified = System.nanoTime();
+    }
+
+    public ByteBasedList()
+    {
+      this(0, MEDIUM_SIZE);
+    }
+
+    protected void reset(int startOffset)  throws HiveException {
+      PTFPersistence.lock(lock.writeLock());
+      try
+      {
+        this.startOffset = startOffset;
+        bytesUsed = 0;
+        currentSize = 0;
+        Arrays.fill(offsetsArray, 0);
+        lastModified = System.nanoTime();
+      }
+      finally {
+        lock.writeLock().unlock();
+      }
+    }
+
+    public ByteBasedList(int capacity)
+    {
+      this(0, capacity);
+    }
+
+    /*
+     * internal api; used by {@link PersistentByteBasedList} to setup BBList from a file.
+     */
+    protected ByteBasedList(File file)
+    {
+      lock = new ReentrantReadWriteLock();
+    }
+
+    private void ensureCapacity(int wlen) throws ListFullException
+    {
+      if ( bytesUsed + wlen > bytes.length)
+      {
+        throw new ListFullException();
+      }
+
+      if ( (2 * currentSize + 1) > offsetsArray.length )
+      {
+        int[] na = new int[offsetsArray.length + INCREMENT_SIZE];
+        System.arraycopy(offsetsArray, 0, na, 0, offsetsArray.length);
+        offsetsArray = na;
+      }
+    }
+
+    private int index(int i) throws HiveException
+    {
+      int j = i - startOffset;
+      j = j << 1;
+      if ( j >  2 * currentSize )
+      {
+        throw new HiveException(String.format("index invalid %d", i));
+      }
+      return j;
+    }
+
+    private void write(Writable w) throws HiveException, IOException
+    {
+      DataOStream dos = PTFPersistence.dos.get();
+      ByteArrayOS bos = dos.getUnderlyingStream();
+      bos.reset();
+      w.write(dos);
+      ensureCapacity(bos.len());
+      int i = currentSize * 2;
+      System.arraycopy(bos.bytearray(), 0, bytes, bytesUsed, bos.len());
+      offsetsArray[i] = bytesUsed;
+      offsetsArray[i+1] = bos.len();
+      currentSize += 1;
+      bytesUsed += bos.len();
+      lastModified = System.nanoTime();
+    }
+
+
+    public int size() throws HiveException
+    {
+      PTFPersistence.lock(lock.readLock());
+      try
+      {
+        return currentSize;
+      }
+      finally
+      {
+        lock.readLock().unlock();
+      }
+    }
+
+    public void get(int i, Writable wObj) throws HiveException
+    {
+      PTFPersistence.lock(lock.readLock());
+      try
+      {
+        i = index(i);
+        DataIStream dis = PTFPersistence.dis.get();
+        ByteArrayIS bis = dis.getUnderlyingStream();
+        bis.setBuffer(bytes, offsetsArray[i], offsetsArray[i+1]);
+        wObj.readFields(dis);
+      }
+      catch(IOException ie)
+      {
+        throw new HiveException(ie);
+      }
+      finally
+      {
+        lock.readLock().unlock();
+      }
+    }
+
+    public void append(Writable obj) throws HiveException
+    {
+      PTFPersistence.lock(lock.writeLock());
+      try
+      {
+        write(obj);
+      }
+      catch(IOException ie)
+      {
+        throw new HiveException(ie);
+      }
+      finally
+      {
+        lock.writeLock().unlock();
+      }
+
+    }
+
+    public Object get(int i, Deserializer deserializer, Writable wObj) throws HiveException
+    {
+      try
+      {
+        get(i, wObj);
+        return deserializer.deserialize(wObj);
+      }
+      catch(SerDeException ie)
+      {
+        throw new HiveException(ie);
+      }
+    }
+
+    public void append(Object obj, ObjectInspector OI, Serializer serializer) throws HiveException
+    {
+      try
+      {
+        append(serializer.serialize(obj, OI));
+      }
+      catch(SerDeException ie)
+      {
+        throw new HiveException(ie);
+      }
+    }
+
+    public Iterator<Writable> iterator(Writable wObj) throws HiveException
+    {
+      return new WIterator(wObj, startOffset);
+    }
+
+    public Iterator<Object> iterator(Deserializer deserializer, Writable wObj)  throws HiveException
+    {
+      return new OIterator(deserializer, wObj);
+    }
+
+    public void dump(StringBuilder bldr, Writable wObj) throws IOException, HiveException
+    {
+      bldr.append("[");
+      Iterator<Writable> wi = iterator(wObj);
+      while(wi.hasNext())
+      {
+        wObj = wi.next();
+        bldr.append(wObj).append(", ");
+      }
+      bldr.append("]\n");
+    }
+
+    public void dump(StringBuilder bldr, Deserializer deserializer, Writable wObj) throws IOException, HiveException
+    {
+      bldr.append("[");
+      Iterator<Object> oi = iterator(deserializer, wObj);
+      while(oi.hasNext())
+      {
+        bldr.append(oi.next()).append(", ");
+      }
+      bldr.append("]\n");
+    }
+
+    class WIterator implements Iterator<Writable>
+    {
+      Writable wObj;
+      long checkTime;
+      int i;
+
+      WIterator(Writable wObj, int offset)
+      {
+        this.wObj = wObj;
+        checkTime = lastModified;
+        i = offset;
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        return i < currentSize;
+      }
+
+      @Override
+      public Writable next()
+      {
+         if (checkTime != lastModified) {
+          throw new ConcurrentModificationException();
+        }
+         try
+         {
+           get(i++, wObj);
+           return wObj;
+         }
+         catch(HiveException be)
+         {
+           throw new RuntimeException(be);
+         }
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    class OIterator implements Iterator<Object>
+    {
+      Deserializer deserializer;
+      Iterator<Writable> wi;
+
+      OIterator(Deserializer deserializer, Writable wObj) throws HiveException
+      {
+        wi = iterator(wObj);
+        this.deserializer = deserializer;
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        return wi.hasNext();
+      }
+
+      @Override
+      public Object next()
+      {
+        Writable wObj = wi.next();
+        try
+        {
+          return deserializer.deserialize(wObj);
+        }catch(SerDeException se)
+         {
+           throw new RuntimeException(se);
+         }
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    public static class ListFullException extends HiveException
+    {
+      private static final long serialVersionUID = 4745303310812778989L;
+
+      public ListFullException()
+      {
+        super();
+      }
+
+      public ListFullException(String message, Throwable cause)
+      {
+        super(message, cause);
+      }
+
+      public ListFullException(String message)
+      {
+        super(message);
+      }
+
+      public ListFullException(Throwable cause)
+      {
+        super(cause);
+      }
+
+    }
+
+    private static final int INCREMENT_SIZE = (int) Math.pow(2, 16);
+
+    public static final int SMALL_SIZE =  (int) Math.pow(2, 6 +10);                // 64KB
+    public static final int MEDIUM_SIZE = (int) Math.pow(2, (10 + 10 + 3));            // 8 MB
+    public static final int LARGE_SIZE = (int) Math.pow(2, (6 + 10 + 10));         // 64 MB
+
+  }
+
+  public static class PartitionedByteBasedList extends ByteBasedList
+  {
+    ArrayList<ByteBasedList> partitions;
+    ArrayList<Integer> partitionOffsets;
+    ArrayList<File> reusableFiles;
+    File dir;
+    int batchSize;
+
+    public PartitionedByteBasedList(int batchSize) throws HiveException
+    {
+      this.batchSize = batchSize;
+      currentSize = 0;
+      dir = PartitionedByteBasedList.createTempDir();
+      Runtime.getRuntime().addShutdownHook(new ShutdownHook(dir));
+
+      partitions = new ArrayList<ByteBasedList>();
+      partitionOffsets = new ArrayList<Integer>();
+      reusableFiles = new ArrayList<File>();
+      addPartition();
+    }
+
+    public PartitionedByteBasedList() throws HiveException
+    {
+      this(ByteBasedList.LARGE_SIZE);
+    }
+
+    @Override
+    protected void reset(int startOffset) throws HiveException {
+      PTFPersistence.lock(lock.writeLock());
+      try {
+        currentSize = 0;
+        for(int i=0; i < partitions.size() - 1; i++) {
+          PersistentByteBasedList p = (PersistentByteBasedList)
+                          partitions.remove(0);
+          reusableFiles.add(p.getFile());
+          partitionOffsets.remove(0);
+        }
+        partitions.get(0).reset(0);
+        partitionOffsets.set(0, currentSize);
+      }
+      finally {
+        lock.writeLock().unlock();
+      }
+    }
+
+    private void addPartition() throws HiveException
+    {
+      try
+      {
+        if ( partitions.size() > 0 )
+        {
+          int idx = partitions.size() - 1;
+          ByteBasedList bl = partitions.get(idx);
+          File f;
+          if ( reusableFiles.size() > 0 ) {
+            f = reusableFiles.remove(0);
+          }
+          else {
+            f = File.createTempFile("wdw", null, dir);
+          }
+          PersistentByteBasedList.store(bl, f);
+          partitions.set(idx, new PersistentByteBasedList(f, bl));
+
+        }
+        ByteBasedList bl = new ByteBasedList(currentSize, batchSize);
+        partitions.add(bl);
+        partitionOffsets.add(currentSize);
+      }
+      catch(IOException ie)
+      {
+        throw new HiveException(ie);
+      }
+    }
+
+    private ByteBasedList getPartition(int i) throws HiveException
+    {
+      PTFPersistence.lock(lock.readLock());
+      try
+      {
+        int numSplits = partitions.size();
+        if ( numSplits == 0) {
+          return partitions.get(0);
+        }
+        int start = 0;
+        int end = numSplits - 1;
+
+        while(start < end)
+        {
+          int mid = (start + end + 1) >>> 1;
+          int val = partitionOffsets.get(mid);
+          if ( val == i )
+          {
+            return partitions.get(mid);
+          }
+          else if ( val < i )
+          {
+            if ( end == mid)
+            {
+              return partitions.get(end);
+            }
+            start = mid;
+          }
+          else
+          {
+            end = mid - 1;
+          }
+        }
+        return partitions.get(start);
+      }
+      finally
+      {
+        lock.readLock().unlock();
+      }
+    }
+
+    @Override
+    public void get(int i, Writable wObj) throws HiveException
+    {
+      ByteBasedList bl = getPartition(i);
+      bl.get(i, wObj);
+    }
+
+    @Override
+    public void append(Writable obj) throws HiveException
+    {
+      PTFPersistence.lock(lock.writeLock());
+      try
+      {
+        partitions.get(partitions.size() -1).append(obj);
+        currentSize += 1;
+        lastModified = System.nanoTime();
+      }
+      catch(ListFullException le)
+      {
+        addPartition();
+        append(obj);
+      }
+      finally
+      {
+        lock.writeLock().unlock();
+      }
+
+    }
+
+    @Override
+    public Object get(int i, Deserializer deserializer, Writable wObj) throws HiveException
+    {
+      ByteBasedList bl = getPartition(i);
+      return bl.get(i, deserializer, wObj);
+    }
+
+    @Override
+    public void append(Object obj, ObjectInspector OI, Serializer serializer) throws HiveException
+    {
+      PTFPersistence.lock(lock.writeLock());
+      try
+      {
+        partitions.get(partitions.size() -1).append(obj, OI, serializer);
+        currentSize += 1;
+        lastModified = System.nanoTime();
+      }
+      catch(ListFullException le)
+      {
+        addPartition();
+        append(obj, OI, serializer);
+      }
+      finally
+      {
+        lock.writeLock().unlock();
+      }
+    }
+
+    @Override
+    public Iterator<Writable> iterator(Writable wObj) throws HiveException
+    {
+      return new WIterator(wObj);
+    }
+
+    class WIterator implements Iterator<Writable>
+    {
+      Writable wObj;
+      long checkTime;
+      int i;
+      Iterator<Writable> pIter;
+
+      WIterator(Writable wObj) throws HiveException
+      {
+        this.wObj = wObj;
+        checkTime = lastModified;
+        i = 0;
+        pIter = partitions.get(i).iterator(wObj);
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        if ( pIter.hasNext() ) {
+          return true;
+        }
+        if (checkTime != lastModified) {
+          throw new ConcurrentModificationException();
+        }
+        try
+        {
+          if ( i < partitions.size() )
+          {
+            pIter = partitions.get(i++).iterator(wObj);
+            return hasNext();
+          }
+          return false;
+        }
+        catch(HiveException e)
+        {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public Writable next()
+      {
+         if (checkTime != lastModified) {
+          throw new ConcurrentModificationException();
+        }
+         return pIter.next();
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    static class ShutdownHook extends Thread
+    {
+      File dir;
+
+      public ShutdownHook(File dir)
+      {
+        this.dir = dir;
+      }
+
+      @Override
+      public void run()
+      {
+        try
+        {
+          PartitionedByteBasedList.deleteRecursively(dir);
+        }
+        catch(IOException ie)
+        {
+        }
+      }
+
+    }
+
+     // copied completely from guavar09 source
+    /**
+     * Deletes a file or directory and all contents recursively.
+     *
+     * <p>
+     * If the file argument is a symbolic link the link will be deleted but not
+     * the target of the link. If the argument is a directory, symbolic links
+     * within the directory will not be followed.
+     *
+     * @param file
+     *            the file to delete
+     * @throws IOException
+     *             if an I/O error occurs
+     * @see #deleteDirectoryContents
+     */
+    public static void deleteRecursively(File file) throws IOException
+    {
+      if (file.isDirectory())
+      {
+        deleteDirectoryContents(file);
+      }
+      if (!file.delete())
+      {
+        throw new IOException("Failed to delete " + file);
+      }
+    }
+
+    // copied completely from guavar09 source
+    /**
+     * Deletes all the files within a directory. Does not delete the directory
+     * itself.
+     *
+     * <p>
+     * If the file argument is a symbolic link or there is a symbolic link in
+     * the path leading to the directory, this method will do nothing. Symbolic
+     * links within the directory are not followed.
+     *
+     * @param directory
+     *            the directory to delete the contents of
+     * @throws IllegalArgumentException
+     *             if the argument is not a directory
+     * @throws IOException
+     *             if an I/O error occurs
+     * @see #deleteRecursively
+     */
+    public static void deleteDirectoryContents(File directory)
+        throws IOException
+    {
+      /*Preconditions.checkArgument(directory.isDirectory(),
+          "Not a directory: %s", directory);
+      */
+      if ( !directory.isDirectory())
+      {
+        throw new IOException(String.format("Not a directory: %s", directory));
+      }
+
+      // Symbolic links will have different canonical and absolute paths
+      if (!directory.getCanonicalPath().equals(directory.getAbsolutePath()))
+      {
+        return;
+      }
+      File[] files = directory.listFiles();
+      if (files == null)
+      {
+        throw new IOException("Error listing files for " + directory);
+      }
+      for (File file : files)
+      {
+        deleteRecursively(file);
+      }
+    }
+
+    // copied completely from guava to remove dependency on guava
+    /** Maximum loop count when creating temp directories. */
+    private static final int TEMP_DIR_ATTEMPTS = 10000;
+    public static File createTempDir()
+    {
+      File baseDir = new File(System.getProperty("java.io.tmpdir"));
+      String baseName = System.currentTimeMillis() + "-";
+
+      for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++)
+      {
+        File tempDir = new File(baseDir, baseName + counter);
+        if (tempDir.mkdir())
+        {
+          return tempDir;
+        }
+      }
+      throw new IllegalStateException("Failed to create directory within "
+          + TEMP_DIR_ATTEMPTS + " attempts (tried " + baseName + "0 to "
+          + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
+    }
+
+  }
+
+  static class PersistentByteBasedList extends ByteBasedList
+  {
+    private static int headerSize() { return (Integer.SIZE + Integer.SIZE + Integer.SIZE + Long.SIZE) / Byte.SIZE;}
+    protected static void store(ByteBasedList l, File f) throws IOException
+    {
+      /*
+       * write startOffset:bytesUsed:currentSize:lastModified
+       */
+      int hdrSize = headerSize();
+      ByteBuffer buf = ByteBuffer.allocate(hdrSize);
+
+      buf.putInt(l.startOffset);
+      buf.putInt(l.bytesUsed);
+      buf.putInt(l.currentSize);
+      buf.putLong(l.lastModified);
+      buf.flip();
+
+      /*
+       * note: could save this space by using Memory-Mapped I/O and directly writing to the MM buffer.
+       */
+      ByteBuffer offsetB = ByteBuffer.allocate((Integer.SIZE/Byte.SIZE) * 2 * l.currentSize);
+      IntBuffer iB = offsetB.asIntBuffer();
+      iB.put(l.offsetsArray, 0, l.currentSize * 2);
+
+      ByteBuffer bytesB = ByteBuffer.wrap(l.bytes, 0, l.bytesUsed);
+
+      ByteBuffer[] bufs = new ByteBuffer[] { buf, offsetB, bytesB};
+      FileOutputStream fos = new FileOutputStream(f);
+      try
+      {
+        FileChannel fc = fos.getChannel();
+        while (fc.write(bufs, 0, bufs.length) > 0) {
+          ;
+        }
+      }
+      finally
+      {
+        fos.close();
+      }
+    }
+
+    protected static void load(ByteBasedList l, File f) throws IOException
+    {
+      int hdr = headerSize();
+      FileInputStream fis = new FileInputStream(f);
+      try
+      {
+        FileChannel fc = fis.getChannel();
+        ByteBuffer buf0 = ByteBuffer.allocate(hdr);
+        while (buf0.hasRemaining()) {
+          fc.read(buf0);
+        }
+        buf0.flip();
+        l.startOffset = buf0.getInt();
+        l.bytesUsed = buf0.getInt();
+        l.currentSize = buf0.getInt();
+        l.lastModified = buf0.getLong();
+
+        /*
+         * note: could save this space by using Memory-Mapped I/O and directly writing to the MM buffer.
+         */
+        ByteBuffer offsetB = ByteBuffer.allocate((Integer.SIZE/Byte.SIZE) * 2 * l.currentSize);
+        ByteBuffer bytesB = ByteBuffer.allocate(l.bytesUsed);
+        ByteBuffer[] bufs = new ByteBuffer[] { offsetB, bytesB };
+        while (fc.read(bufs) > 0) {
+          ;
+        }
+
+        l.offsetsArray = new int[l.currentSize * 2];
+        offsetB.flip();
+        IntBuffer iB = offsetB.asIntBuffer();
+        iB.get(l.offsetsArray);
+        l.bytes = bytesB.array();
+      }
+      finally
+      {
+        fis.close();
+      }
+    }
+
+    File file;
+    SoftReference<ByteBasedList> memList;
+
+    protected PersistentByteBasedList(File file, ByteBasedList l)
+    {
+      super(file);
+      this.file = file;
+      memList = new SoftReference<ByteBasedList>(l);
+    }
+
+    protected PersistentByteBasedList(File file)
+    {
+      this(file, null);
+    }
+
+    @Override
+    protected void reset(int startOffset) throws HiveException {
+      throw new HiveException("Reset on PersistentByteBasedList not supported");
+    }
+
+    private ByteBasedList getList() throws HiveException
+    {
+      PTFPersistence.lock(lock.readLock());
+      try
+      {
+        ByteBasedList list = memList.get();
+        if (list == null)
+        {
+          try
+          {
+            list = new ByteBasedList(file);
+            load(list, file);
+            memList = new SoftReference<ByteBasedList>(list);
+          }
+          catch(Exception ie)
+          {
+            throw new RuntimeException(ie);
+          }
+        }
+        return list;
+      }
+      finally
+      {
+        lock.readLock().unlock();
+      }
+    }
+
+    File getFile() {
+      return file;
+    }
+
+    @Override
+    public int size() throws HiveException
+    {
+      return getList().size();
+    }
+
+    @Override
+    public void get(int i, Writable wObj) throws HiveException
+    {
+      getList().get(i, wObj);
+    }
+
+    @Override
+    public void append(Writable obj) throws HiveException
+    {
+      throw new UnsupportedOperationException("Cannot append to a Persisted List");
+    }
+
+    @Override
+    public Object get(int i, Deserializer deserializer, Writable wObj) throws HiveException
+    {
+      return getList().get(i, deserializer, wObj);
+    }
+
+    @Override
+    public void append(Object obj, ObjectInspector OI, Serializer serializer) throws HiveException
+    {
+      throw new UnsupportedOperationException("Cannot append to a Persisted List");
+    }
+
+    @Override
+    public Iterator<Writable> iterator(Writable wObj) throws HiveException
+    {
+      return getList().iterator(wObj);
+    }
+
+    @Override
+    public Iterator<Object> iterator(Deserializer deserializer, Writable wObj) throws HiveException
+    {
+      return getList().iterator(deserializer, wObj);
+    }
+
+    @Override
+    public void dump(StringBuilder bldr, Writable wObj) throws IOException, HiveException
+    {
+      getList().dump(bldr, wObj);
+    }
+
+    @Override
+    public void dump(StringBuilder bldr, Deserializer deserializer, Writable wObj) throws IOException, HiveException
+    {
+      getList().dump(bldr, deserializer, wObj);
+    }
+  }
+
+  public static class ByteBufferInputStream extends InputStream
+  {
+    ByteBuffer buffer;
+    int mark = -1;
+
+    public void intialize(ByteBuffer buffer)
+    {
+      this.buffer = buffer;
+    }
+
+    public void intialize(ByteBuffer buffer, int off, int len)
+    {
+      buffer = buffer.duplicate();
+      buffer.position(off);
+      buffer.limit(off + len);
+      this.buffer = buffer.slice();
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+      return buffer.hasRemaining() ? (buffer.get() & 0xff) : -1;
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException
+    {
+      int remaining = buffer.remaining();
+      len= len <= remaining ? len : remaining;
+      buffer.get(b, off, len);
+      return len;
+    }
+
+    @Override
+    public boolean markSupported() { return true; }
+
+    @Override
+    public void mark(int readAheadLimit)
+    {
+      mark = buffer.position();
+    }
+
+    @Override
+    public void reset()
+    {
+      if ( mark == -1 ) {
+        throw new IllegalStateException();
+      }
+      buffer.position(mark);
+      mark = -1;
+    }
+  }
+
+  public static class ByteBufferOutputStream extends OutputStream
+  {
+    ByteBuffer buffer;
+
+    public void intialize(ByteBuffer buffer)
+    {
+      this.buffer = buffer;
+    }
+
+    public void intialize(ByteBuffer buffer, int off, int len)
+    {
+      buffer = buffer.duplicate();
+      buffer.position(off);
+      buffer.limit(off + len);
+      this.buffer = buffer.slice();
+    }
+
+    @Override
+    public void write(int b) throws IOException
+    {
+      buffer.put((byte) b);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len)
+    {
+      int remaining = buffer.remaining();
+      if ( len > remaining )
+      {
+        throw new IndexOutOfBoundsException();
+      }
+      buffer.put(b, off, len);
+    }
+  }
+
+  public static ThreadLocal<ByteArrayIS> bis = new ThreadLocal<ByteArrayIS>()
+  {
+    @Override
+    protected ByteArrayIS initialValue()
+    {
+      return new ByteArrayIS();
+    }
+  };
+
+  public static ThreadLocal<DataIStream> dis = new ThreadLocal<DataIStream>()
+  {
+    @Override
+    protected DataIStream initialValue()
+    {
+      return new DataIStream(bis.get());
+    }
+  };
+
+  public static ThreadLocal<ByteArrayOS> bos = new ThreadLocal<ByteArrayOS>()
+  {
+    @Override
+    protected ByteArrayOS initialValue()
+    {
+      return new ByteArrayOS();
+    }
+  };
+
+  public static ThreadLocal<DataOStream> dos = new ThreadLocal<DataOStream>()
+  {
+    @Override
+    protected DataOStream initialValue()
+    {
+      return new DataOStream(bos.get());
+    }
+  };
+
+
+  public static class DataIStream extends DataInputStream
+  {
+    public DataIStream(ByteArrayIS in)
+    {
+      super(in);
+    }
+
+    public ByteArrayIS getUnderlyingStream() { return (ByteArrayIS) in; }
+  }
+
+  public static class DataOStream extends DataOutputStream
+  {
+    public DataOStream(ByteArrayOS out)
+    {
+      super(out);
+    }
+
+    public ByteArrayOS getUnderlyingStream() { return (ByteArrayOS) out; }
+  }
+
+  public static class ByteArrayOS extends ByteArrayOutputStream
+  {
+    public ByteArrayOS() { super(); }
+    public ByteArrayOS(int size) { super(size); }
+    public final byte[] bytearray() { return buf; }
+    public final int len() { return count; }
+
+  }
+
+  public static class ByteArrayIS extends ByteArrayInputStream
+  {
+    public ByteArrayIS() { super(new byte[0]); }
+    public final byte[] bytearray() { return buf; }
+    public final void setBuffer(byte[] buf, int offset, int len)
+    {
+      this.buf = buf;
+          this.pos = offset;
+          this.count = Math.min(offset + len, buf.length);
+          this.mark = offset;
+    }
+
+  }
+
+  public static void lock(Lock lock) throws HiveException
+  {
+    try
+    {
+      lock.lockInterruptibly();
+
+    }
+    catch(InterruptedException ie)
+    {
+      Thread.currentThread().interrupt();
+      throw new HiveException("Operation interrupted", ie);
+    }
+  }
+
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.beans.BeanInfo;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PersistenceDelegate;
+import java.beans.PropertyDescriptor;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.antlr.runtime.CommonToken;
+import org.antlr.runtime.tree.BaseTree;
+import org.antlr.runtime.tree.CommonTree;
+import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+public class PTFUtils {
+
+  public static String toString(List<?> col)
+  {
+    StringBuilder buf = new StringBuilder();
+    buf.append("[");
+    boolean first = true;
+    for (Object o : col)
+    {
+      if (first) {
+        first = false;
+      } else {
+        buf.append(", ");
+      }
+      buf.append(o.toString());
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+
+  public static String toString(Map<?, ?> col)
+  {
+    StringBuilder buf = new StringBuilder();
+    buf.append("[");
+    boolean first = true;
+    for (Map.Entry<?, ?> o : col.entrySet())
+    {
+      if (first) {
+        first = false;
+      } else {
+        buf.append(", ");
+      }
+      buf.append(o.getKey().toString()).append(" : ")
+          .append(o.getValue().toString());
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+
+  public static String unescapeQueryString(String qry)
+  {
+    qry = qry.replace("\\\"", "\"");
+    qry = qry.replace("\\'", "'");
+    return qry;
+  }
+
+  public static class ReverseIterator<T> implements Iterator<T>
+  {
+    Stack<T> stack;
+
+    public ReverseIterator(Iterator<T> it)
+    {
+      stack = new Stack<T>();
+      while (it.hasNext())
+      {
+        stack.push(it.next());
+      }
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return !stack.isEmpty();
+    }
+
+    @Override
+    public T next()
+    {
+      return stack.pop();
+    }
+
+    @Override
+    public void remove()
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static abstract class Predicate<T>
+  {
+    public abstract boolean apply(T obj);
+  };
+
+
+
+  /*
+   * serialization functions
+   */
+  public static void serialize(OutputStream out, Object o)
+  {
+    XMLEncoder e = new XMLEncoder(out);
+    e.setExceptionListener(new EL());
+    PTFUtils.addPersistenceDelegates(e);
+    e.writeObject(o);
+    e.close();
+  }
+
+  public static Object deserialize(InputStream in1)
+  {
+    XMLDecoder d = null;
+    try
+    {
+      d = new XMLDecoder(in1, null, null);
+      return d.readObject();
+    }
+    finally
+    {
+      if (null != d)
+      {
+        d.close();
+      }
+    }
+  }
+
+  public static void addPersistenceDelegates(XMLEncoder e)
+  {
+    addAntlrPersistenceDelegates(e);
+    addHivePersistenceDelegates(e);
+    addEnumDelegates(e);
+  }
+
+  public static void addEnumDelegates(XMLEncoder e)
+  {
+    e.setPersistenceDelegate(Direction.class, new EnumDelegate());
+  }
+
+  public static void addAntlrPersistenceDelegates(XMLEncoder e)
+  {
+    e.setPersistenceDelegate(ASTNode.class, new PersistenceDelegate()
+    {
+
+      @Override
+      protected Expression instantiate(Object oldInstance, Encoder out)
+      {
+        return new Expression(oldInstance, oldInstance.getClass(),
+            "new", new Object[]
+            { ((ASTNode) oldInstance).getToken() });
+      }
+    });
+    e.setPersistenceDelegate(CommonTree.class, new PersistenceDelegate()
+    {
+      @Override
+      protected Expression instantiate(Object oldInstance, Encoder out)
+      {
+        return new Expression(oldInstance, oldInstance.getClass(),
+            "new", new Object[]
+            { ((CommonTree) oldInstance).getToken() });
+      }
+    });
+    e.setPersistenceDelegate(BaseTree.class, new PersistenceDelegate()
+    {
+      @Override
+      protected Expression instantiate(Object oldInstance, Encoder out)
+      {
+        return new Expression(oldInstance, oldInstance.getClass(),
+            "new", new Object[]
+            {});
+      }
+
+      @Override
+      @SuppressWarnings("rawtypes")
+      protected void initialize(Class type, Object oldInstance,
+          Object newInstance, Encoder out)
+      {
+        super.initialize(type, oldInstance, newInstance, out);
+
+        BaseTree t = (BaseTree) oldInstance;
+
+        for (int i = 0; i < t.getChildCount(); i++)
+        {
+          out.writeStatement(new Statement(oldInstance, "addChild",
+              new Object[]
+              { t.getChild(i) }));
+        }
+      }
+    });
+    e.setPersistenceDelegate(CommonToken.class, new PersistenceDelegate()
+    {
+      @Override
+      protected Expression instantiate(Object oldInstance, Encoder out)
+      {
+        return new Expression(oldInstance, oldInstance.getClass(),
+            "new", new Object[]
+            { ((CommonToken) oldInstance).getType(),
+                ((CommonToken) oldInstance).getText() });
+      }
+    });
+  }
+
+  public static void addHivePersistenceDelegates(XMLEncoder e)
+  {
+    e.setPersistenceDelegate(PrimitiveTypeInfo.class,
+        new PersistenceDelegate()
+        {
+          @Override
+          protected Expression instantiate(Object oldInstance,
+              Encoder out)
+          {
+            return new Expression(oldInstance,
+                TypeInfoFactory.class, "getPrimitiveTypeInfo",
+                new Object[]
+                { ((PrimitiveTypeInfo) oldInstance)
+                    .getTypeName() });
+          }
+        });
+  }
+
+  static class EL implements ExceptionListener
+  {
+    public void exceptionThrown(Exception e)
+    {
+      e.printStackTrace();
+      throw new RuntimeException("Cannot serialize the query plan", e);
+    }
+  }
+
+  public static void makeTransient(Class<?> beanClass, String pdName)
+  {
+    BeanInfo info;
+    try
+    {
+      info = Introspector.getBeanInfo(beanClass);
+      PropertyDescriptor[] propertyDescriptors = info
+          .getPropertyDescriptors();
+      for (int i = 0; i < propertyDescriptors.length; ++i)
+      {
+        PropertyDescriptor pd = propertyDescriptors[i];
+        if (pd.getName().equals(pdName))
+        {
+          pd.setValue("transient", Boolean.TRUE);
+        }
+      }
+    }
+    catch (IntrospectionException ie)
+    {
+      throw new RuntimeException(ie);
+    }
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+@Documented
+public @interface PartitionTableFunctionDescription
+{
+	Description description ();
+
+	/**
+	 * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function.
+	 */
+	boolean isInternal() default false;
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+@Documented
+public @interface WindowFunctionDescription
+{
+	Description description ();
+	/**
+	 * controls whether this function can be applied to a Window.
+	 * <p>
+	 * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows.
+	 * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on.
+	 * For ranking defining a set of rows for every row makes no sense.
+	 * <p>
+	 * All other UDAFs can be computed for a Window.
+	 */
+	boolean supportsWindow() default true;
+	/**
+	 * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value.
+	 * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the
+	 * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value
+	 * for all the rows.
+	 */
+	boolean pivotResult() default false;
+}
+

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
+
+@SuppressWarnings("deprecation")
+public class WindowFunctionInfo
+{
+	boolean supportsWindow = true;
+	boolean pivotResult = false;
+	FunctionInfo fInfo;
+
+	WindowFunctionInfo(FunctionInfo fInfo)
+	{
+		assert fInfo.isGenericUDAF();
+		this.fInfo = fInfo;
+		Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass();
+		WindowFunctionDescription def = wfnCls.getAnnotation(WindowFunctionDescription.class);
+		if ( def != null)
+		{
+			supportsWindow = def.supportsWindow();
+			pivotResult = def.pivotResult();
+		}
+	}
+
+	public boolean isSupportsWindow()
+	{
+		return supportsWindow;
+	}
+
+	public boolean isPivotResult()
+	{
+		return pivotResult;
+	}
+
+	public FunctionInfo getfInfo()
+	{
+		return fInfo;
+	}
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java Tue Apr  2 14:16:34 2013
@@ -47,7 +47,8 @@ public class LineageInfo implements Seri
    *                 set operations like union on columns on other tables
    *                 e.g. T2.c1 = T1.c1 + T3.c1.
    * 4. SCRIPT - Indicates that the column is derived from the output
-   *             of a user script through a TRANSFORM, MAP or REDUCE syntax.
+   *             of a user script through a TRANSFORM, MAP or REDUCE syntax
+   *             or from the output of a PTF chain execution.
    */
   public static enum DependencyType {
     SIMPLE, EXPRESSION, SCRIPT

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Tue Apr  2 14:16:34 2013
@@ -23,19 +23,19 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -114,6 +114,9 @@ public class ColumnPruner implements Tra
     opRules.put(new RuleRegExp("R9",
       LateralViewForwardOperator.getOperatorName() + "%"),
       ColumnPrunerProcFactory.getLateralViewForwardProc());
+    opRules.put(new RuleRegExp("R10",
+        PTFOperator.getOperatorName() + "%"),
+        ColumnPrunerProcFactory.getPTFProc());
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Tue Apr  2 14:16:34 2013
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.La
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.ScriptOperator;
@@ -62,11 +66,21 @@ import org.apache.hadoop.hive.ql.plan.Gr
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
  * Factory for generating the different node processors used by ColumnPruner.
@@ -148,6 +162,170 @@ public final class ColumnPrunerProcFacto
   }
 
   /**
+   * - Pruning can only be done for Windowing. PTFs are black boxes,
+   * we assume all columns are needed.
+   * - add column names referenced in WindowFn args and in WindowFn expressions
+   * to the pruned list of the child Select Op.
+   * - Prune the Column names & types serde properties in each of the Shapes in the PTF Chain:
+   *    - the InputDef's output shape
+   *    - Window Tabl Functions: window output shape & output shape.
+   * - Why is pruning the Column names & types in the serde properties enough?
+   *   - because during runtime we rebuild the OIs using these properties.
+   * - finally we set the prunedColList on the ColumnPrunerContx;
+   * and update the RR & signature on the PTFOp.
+   */
+  public static class ColumnPrunerPTFProc implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+
+      PTFOperator op = (PTFOperator) nd;
+      PTFDesc conf = op.getConf();
+      //Since we cannot know what columns will be needed by a PTF chain,
+      //we do not prune columns on PTFOperator for PTF chains.
+      if (!conf.forWindowing()) {
+        return getDefaultProc().process(nd, stack, ctx, nodeOutputs);
+      }
+
+      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
+      WindowTableFunctionDef def = (WindowTableFunctionDef) conf.getFuncDef();
+      ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>();
+
+      List<String> prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0));
+      //we create a copy of prunedCols to create a list of pruned columns for PTFOperator
+      prunedCols = new ArrayList<String>(prunedCols);
+      prunedColumnsList(prunedCols, def);
+      setSerdePropsOfShape(def.getInput().getOutputShape(), prunedCols);
+      setSerdePropsOfShape(def.getOutputFromWdwFnProcessing(), prunedCols);
+      setSerdePropsOfShape(def.getOutputShape(), prunedCols);
+
+      RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
+      RowResolver newRR = buildPrunedRR(prunedCols, oldRR, sig);
+      cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def));
+      cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newRR);
+      op.getSchema().setSignature(sig);
+      return null;
+    }
+
+    private static RowResolver buildPrunedRR(List<String> prunedCols,
+        RowResolver oldRR, ArrayList<ColumnInfo> sig) throws SemanticException{
+      RowResolver newRR = new RowResolver();
+      HashSet<String> prunedColsSet = new HashSet<String>(prunedCols);
+      for(ColumnInfo cInfo : oldRR.getRowSchema().getSignature()) {
+        if ( prunedColsSet.contains(cInfo.getInternalName())) {
+          String[] nm = oldRR.reverseLookup(cInfo.getInternalName());
+          newRR.put(nm[0], nm[1], cInfo);
+          sig.add(cInfo);
+        }
+      }
+      return newRR;
+    }
+
+    /*
+     * add any input columns referenced in WindowFn args or expressions.
+     */
+    private void prunedColumnsList(List<String> prunedCols, WindowTableFunctionDef tDef) {
+      if ( tDef.getWindowFunctions() != null ) {
+        for(WindowFunctionDef wDef : tDef.getWindowFunctions() ) {
+          if ( wDef.getArgs() == null) {
+            continue;
+          }
+          for(PTFExpressionDef arg : wDef.getArgs()) {
+            ExprNodeDesc exprNode = arg.getExprNode();
+            Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+          }
+        }
+      }
+      if ( tDef.getWindowExpressions() != null ) {
+        for(WindowExpressionDef expr : tDef.getWindowExpressions()) {
+          ExprNodeDesc exprNode = expr.getExprNode();
+          Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+        }
+      }
+     if(tDef.getPartition() != null){
+         for(PTFExpressionDef col : tDef.getPartition().getExpressions()){
+           ExprNodeDesc exprNode = col.getExprNode();
+           Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+         }
+       }
+       if(tDef.getOrder() != null){
+         for(PTFExpressionDef col : tDef.getOrder().getExpressions()){
+           ExprNodeDesc exprNode = col.getExprNode();
+           Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
+         }
+       }
+    }
+
+    private List<String> getLowerCasePrunedCols(List<String> prunedCols){
+      List<String> lowerCasePrunedCols = new ArrayList<String>();
+      for (String col : prunedCols) {
+        lowerCasePrunedCols.add(col.toLowerCase());
+      }
+      return lowerCasePrunedCols;
+    }
+
+    /*
+     * reconstruct Column names & types list based on the prunedCols list.
+     */
+    private void setSerdePropsOfShape(ShapeDetails shp, List<String> prunedCols) {
+      List<String> columnNames = Arrays.asList(shp.getSerdeProps().get(
+          org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS).split(","));
+      List<TypeInfo> columnTypes = TypeInfoUtils
+          .getTypeInfosFromTypeString(shp.getSerdeProps().get(
+              org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES));
+      /*
+       * fieldNames in OI are lower-cased. So we compare lower cased names for now.
+       */
+      prunedCols = getLowerCasePrunedCols(prunedCols);
+
+      StringBuilder cNames = new StringBuilder();
+      StringBuilder cTypes = new StringBuilder();
+
+      boolean addComma = false;
+      for(int i=0; i < columnNames.size(); i++) {
+        if ( prunedCols.contains(columnNames.get(i)) ) {
+          cNames.append(addComma ? "," : "");
+          cTypes.append(addComma ? "," : "");
+          cNames.append(columnNames.get(i));
+          cTypes.append(columnTypes.get(i));
+          addComma = true;
+        }
+      }
+      shp.getSerdeProps().put(
+          org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, cNames.toString());
+      shp.getSerdeProps().put(
+          org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, cTypes.toString());
+    }
+
+    /*
+     * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs
+     * the returned list is set as the prunedList needed by the PTFOp.
+     */
+    private ArrayList<String> prunedInputList(List<String> prunedCols,
+        WindowTableFunctionDef tDef) {
+      ArrayList<String> prunedInputCols = new ArrayList<String>();
+
+      StructObjectInspector OI = tDef.getInput().getOutputShape().getOI();
+      for(StructField f : OI.getAllStructFieldRefs()) {
+        String fName = f.getFieldName();
+        if ( prunedCols.contains(fName)) {
+          prunedInputCols.add(fName);
+        }
+      }
+
+      return prunedInputCols;
+    }
+  }
+
+  /**
+   * Factory method to get the ColumnPrunerGroupByProc class.
+   *
+   * @return ColumnPrunerGroupByProc
+   */
+  public static ColumnPrunerPTFProc getPTFProc() {
+    return new ColumnPrunerPTFProc();
+  }
+
+  /**
    * The Default Node Processor for Column Pruning.
    */
   public static class ColumnPrunerDefaultProc implements NodeProcessor {
@@ -285,6 +463,39 @@ public final class ColumnPrunerProcFacto
         }
         Collections.sort(colLists);
         pruneReduceSinkOperator(flags, op, cppCtx);
+      } else if ((childOperators.size() == 1)
+          && (childOperators.get(0) instanceof ExtractOperator )
+          && (childOperators.get(0).getChildOperators().size() == 1)
+          && (childOperators.get(0).getChildOperators().get(0) instanceof PTFOperator )
+          && ((PTFOperator)childOperators.get(0).
+              getChildOperators().get(0)).getConf().forWindowing() )  {
+
+        /*
+         * For RS that are followed by Extract & PTFOp for windowing
+         * - do the same thing as above. Reconstruct ValueColumn list based on what is required
+         *   by the PTFOp.
+         */
+
+        assert parentOperators.size() == 1;
+
+        PTFOperator ptfOp = (PTFOperator) childOperators.get(0).getChildOperators().get(0);
+        List<String> childCols = cppCtx.getPrunedColList(ptfOp);
+        boolean[] flags = new boolean[conf.getValueCols().size()];
+        for (int i = 0; i < flags.length; i++) {
+          flags[i] = false;
+        }
+        if (childCols != null && childCols.size() > 0) {
+          ArrayList<String> outColNames = op.getConf().getOutputValueColumnNames();
+          for(int i=0; i < outColNames.size(); i++ ) {
+            if ( childCols.contains(outColNames.get(i))) {
+              ExprNodeDesc exprNode = op.getConf().getValueCols().get(i);
+              flags[i] = true;
+              Utilities.mergeUniqElems(colLists, exprNode.getCols());
+            }
+          }
+        }
+        Collections.sort(colLists);
+        pruneReduceSinkOperator(flags, op, cppCtx);
       } else {
         // Reduce Sink contains the columns needed - no need to aggregate from
         // children
@@ -831,4 +1042,4 @@ public final class ColumnPrunerProcFacto
     return new ColumnPrunerMapJoinProc();
   }
 
-}
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java Tue Apr  2 14:16:34 2013
@@ -22,17 +22,17 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UDTFOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -82,6 +82,8 @@ public class Generator implements Transf
       OpProcFactory.getReduceSinkProc());
     opRules.put(new RuleRegExp("R9", LateralViewJoinOperator.getOperatorName() + "%"),
       OpProcFactory.getLateralViewJoinProc());
+    opRules.put(new RuleRegExp("R10", PTFOperator.getOperatorName() + "%"),
+        OpProcFactory.getTransformProc());
 
     // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java Tue Apr  2 14:16:34 2013
@@ -37,10 +37,6 @@ public class ASTNode extends CommonTree 
   public ASTNode() {
   }
 
-  public ASTNode(ASTNode copy){
-    super(copy);
-  }
-
   /**
    * Constructor.
    *
@@ -51,6 +47,16 @@ public class ASTNode extends CommonTree 
     super(t);
   }
 
+  public ASTNode(ASTNode node) {
+    super(node);
+    this.origin = node.origin;
+  }
+
+  @Override
+  public Tree dupNode() {
+    return new ASTNode(this);
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -95,12 +101,6 @@ public class ASTNode extends CommonTree 
     this.origin = origin;
   }
 
-  @Override
-  public Tree dupNode() {
-
-    return new ASTNode(this);
-  }
-
   public String dump() {
     StringBuilder sb = new StringBuilder();
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Tue Apr  2 14:16:34 2013
@@ -139,7 +139,7 @@ fromSource
 @init { gParent.msgs.push("from source"); }
 @after { gParent.msgs.pop(); }
     :
-    (tableSource | subQuerySource) (lateralView^)*
+    ((Identifier LPAREN)=> partitionedTableFunction | tableSource | subQuerySource) (lateralView^)*
     ;
 
 tableBucketSample
@@ -202,6 +202,38 @@ subQuerySource
     LPAREN queryStatementExpression RPAREN identifier -> ^(TOK_SUBQUERY queryStatementExpression identifier)
     ;
 
+//---------------------- Rules for parsing PTF clauses -----------------------------
+partitioningSpec
+@init { gParent.msgs.push("partitioningSpec clause"); }
+@after { gParent.msgs.pop(); } 
+   :
+   partitionByClause orderByClause? -> ^(TOK_PARTITIONINGSPEC partitionByClause orderByClause?) |
+   orderByClause -> ^(TOK_PARTITIONINGSPEC orderByClause) |
+   distributeByClause sortByClause? -> ^(TOK_PARTITIONINGSPEC distributeByClause sortByClause?) |
+   sortByClause -> ^(TOK_PARTITIONINGSPEC sortByClause) |
+   clusterByClause -> ^(TOK_PARTITIONINGSPEC clusterByClause)
+   ;
+
+partitionTableFunctionSource
+@init { gParent.msgs.push("partitionTableFunctionSource clause"); }
+@after { gParent.msgs.pop(); } 
+   :
+   subQuerySource |
+   tableSource |
+   partitionedTableFunction
+   ;
+
+partitionedTableFunction
+@init { gParent.msgs.push("ptf clause"); }
+@after { gParent.msgs.pop(); } 
+   :
+   name=Identifier
+   LPAREN KW_ON ptfsrc=partitionTableFunctionSource partitioningSpec?
+     ((Identifier LPAREN expression RPAREN ) => Identifier LPAREN expression RPAREN ( COMMA Identifier LPAREN expression RPAREN)*)? 
+   RPAREN alias=Identifier?
+   ->   ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+   ; 
+
 //----------------------- Rules for parsing whereClause -----------------------------
 // where a=b and ...
 whereClause

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g Tue Apr  2 14:16:34 2013
@@ -242,6 +242,14 @@ KW_ROLLUP: 'ROLLUP';
 KW_CUBE: 'CUBE';
 KW_DIRECTORIES: 'DIRECTORIES';
 KW_FOR: 'FOR';
+KW_WINDOW: 'WINDOW';
+KW_UNBOUNDED: 'UNBOUNDED';
+KW_PRECEDING: 'PRECEDING';
+KW_FOLLOWING: 'FOLLOWING';
+KW_CURRENT: 'CURRENT';
+KW_LESS: 'LESS';
+KW_MORE: 'MORE';
+KW_OVER: 'OVER';
 KW_GROUPING: 'GROUPING';
 KW_SETS: 'SETS';
 KW_TRUNCATE: 'TRUNCATE';

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Tue Apr  2 14:16:34 2013
@@ -288,6 +288,12 @@ TOK_SKEWED_LOCATIONS;
 TOK_SKEWED_LOCATION_LIST;
 TOK_SKEWED_LOCATION_MAP;
 TOK_STOREDASDIRS;
+TOK_PARTITIONINGSPEC;
+TOK_PTBLFUNCTION;
+TOK_WINDOWDEF;
+TOK_WINDOWSPEC;
+TOK_WINDOWVALUES;
+TOK_WINDOWRANGE;
 TOK_IGNOREPROTECTION;
 }
 
@@ -1792,9 +1798,10 @@ regular_body
    clusterByClause?
    distributeByClause?
    sortByClause?
+   window_clause?
    limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT insertClause
                      selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
-                     distributeByClause? sortByClause? limitClause?))
+                     distributeByClause? sortByClause? window_clause? limitClause?))
    |
    selectStatement
    ;
@@ -1810,9 +1817,10 @@ selectStatement
    clusterByClause?
    distributeByClause?
    sortByClause?
+   window_clause?
    limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
                      selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
-                     distributeByClause? sortByClause? limitClause?))
+                     distributeByClause? sortByClause? window_clause? limitClause?))
    ;
 
 
@@ -1827,9 +1835,10 @@ body
    clusterByClause?
    distributeByClause?
    sortByClause?
+   window_clause?
    limitClause? -> ^(TOK_INSERT insertClause?
                      selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
-                     distributeByClause? sortByClause? limitClause?)
+                     distributeByClause? sortByClause? window_clause? limitClause?)
    |
    selectClause
    whereClause?
@@ -1839,9 +1848,10 @@ body
    clusterByClause?
    distributeByClause?
    sortByClause?
+   window_clause?
    limitClause? -> ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
                      selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
-                     distributeByClause? sortByClause? limitClause?)
+                     distributeByClause? sortByClause? window_clause? limitClause?)
    ;
 
 insertClause

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g Tue Apr  2 14:16:34 2013
@@ -124,7 +124,18 @@ clusterByClause
     |
     KW_CLUSTER KW_BY
     expression
-    ( COMMA expression )* -> ^(TOK_CLUSTERBY expression+)
+    ( (COMMA)=>COMMA expression )* -> ^(TOK_CLUSTERBY expression+)
+    ;
+
+partitionByClause
+@init  { gParent.msgs.push("partition by clause"); }
+@after { gParent.msgs.pop(); }
+    :
+    KW_PARTITION KW_BY
+    LPAREN expression (COMMA expression)* RPAREN -> ^(TOK_DISTRIBUTEBY expression+)
+    |
+    KW_PARTITION KW_BY
+    expression ((COMMA)=> COMMA expression)* -> ^(TOK_DISTRIBUTEBY expression+)
     ;
 
 distributeByClause
@@ -135,7 +146,7 @@ distributeByClause
     LPAREN expression (COMMA expression)* RPAREN -> ^(TOK_DISTRIBUTEBY expression+)
     |
     KW_DISTRIBUTE KW_BY
-    expression (COMMA expression)* -> ^(TOK_DISTRIBUTEBY expression+)
+    expression ((COMMA)=> COMMA expression)* -> ^(TOK_DISTRIBUTEBY expression+)
     ;
 
 sortByClause
@@ -148,7 +159,7 @@ sortByClause
     |
     KW_SORT KW_BY
     columnRefOrder
-    ( COMMA columnRefOrder)* -> ^(TOK_SORTBY columnRefOrder+)
+    ( (COMMA)=> COMMA columnRefOrder)* -> ^(TOK_SORTBY columnRefOrder+)
     ;
 
 // fun(par1, par2, par3)

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+
+public class PTFInvocationSpec {
+
+  PartitionedTableFunctionSpec function;
+
+  public PartitionedTableFunctionSpec getFunction() {
+    return function;
+  }
+
+  public void setFunction(PartitionedTableFunctionSpec function) {
+    this.function = function;
+  }
+
+  public PartitionedTableFunctionSpec getStartOfChain() {
+    return function == null ? null : function.getStartOfChain();
+  }
+
+  public String getQueryInputName() {
+    return function == null ? null : function.getQueryInputName();
+  }
+
+  public PTFQueryInputSpec getQueryInput() {
+    return function == null ? null : function.getQueryInput();
+  }
+
+  /*
+   * A PTF Input represents the input to a PTF Function. An Input can be a Hive SubQuery or Table
+   * or another PTF Function. An Input instance captures the ASTNode that this instance was created from.
+   */
+  public abstract static class PTFInputSpec {
+    ASTNode astNode;
+
+    public ASTNode getAstNode() {
+      return astNode;
+    }
+
+    public void setAstNode(ASTNode astNode) {
+      this.astNode = astNode;
+    }
+
+    public abstract PTFInputSpec getInput();
+
+    public abstract String getQueryInputName();
+    public abstract PTFQueryInputSpec getQueryInput();
+  }
+
+  public static enum PTFQueryInputType {
+    TABLE,
+    SUBQUERY,
+    PTFCOMPONENT,
+    WINDOWING;
+  }
+
+  /*
+   * A PTF input that represents a source in the overall Query. This could be a Table or a SubQuery.
+   * If a PTF chain requires execution by multiple PTF Operators;
+   * then the original Invocation object is decomposed into a set of Component Invocations.
+   * Every component Invocation but the first one ends in a PTFQueryInputSpec instance.
+   * During the construction of the Operator plan a PTFQueryInputSpec object in the chain implies connect the PTF Operator to the
+   * 'input' i.e. has been generated so far.
+   */
+  public static class PTFQueryInputSpec extends PTFInputSpec {
+    String source;
+    PTFQueryInputType type;
+
+    public String getSource() {
+      return source;
+    }
+    public void setSource(String source) {
+      this.source = source;
+    }
+    public PTFQueryInputType getType() {
+      return type;
+    }
+    public void setType(PTFQueryInputType type) {
+      this.type = type;
+    }
+
+    @Override
+    public PTFInputSpec getInput() {
+      return null;
+    }
+
+    @Override
+    public String getQueryInputName() {
+      return getSource();
+    }
+    @Override
+    public PTFQueryInputSpec getQueryInput() {
+      return this;
+    }
+  }
+
+  /*
+   * Represents a PTF Invocation. Captures:
+   * - function name and alias
+   * - the Partitioning details about its input
+   * - its arguments. The ASTNodes representing the arguments are captured here.
+   * - a reference to its Input
+   */
+  public static class PartitionedTableFunctionSpec  extends PTFInputSpec {
+    String name;
+    String alias;
+    ArrayList<ASTNode> args;
+    PartitioningSpec partitioning;
+    PTFInputSpec input;
+    public String getName() {
+      return name;
+    }
+    public void setName(String name) {
+      this.name = name;
+    }
+    public String getAlias() {
+      return alias;
+    }
+    public void setAlias(String alias) {
+      this.alias = alias;
+    }
+    public ArrayList<ASTNode> getArgs() {
+      return args;
+    }
+    public void setArgs(ArrayList<ASTNode> args) {
+      this.args = args;
+    }
+    public PartitioningSpec getPartitioning() {
+      return partitioning;
+    }
+    public void setPartitioning(PartitioningSpec partitioning) {
+      this.partitioning = partitioning;
+    }
+    @Override
+    public PTFInputSpec getInput() {
+      return input;
+    }
+    public void setInput(PTFInputSpec input) {
+      this.input = input;
+    }
+    public PartitionSpec getPartition() {
+      return getPartitioning() == null ? null : getPartitioning().getPartSpec();
+    }
+    public void setPartition(PartitionSpec partSpec) {
+      partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+      partitioning.setPartSpec(partSpec);
+    }
+    public OrderSpec getOrder() {
+      return getPartitioning() == null ? null : getPartitioning().getOrderSpec();
+    }
+    public void setOrder(OrderSpec orderSpec) {
+      partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+      partitioning.setOrderSpec(orderSpec);
+    }
+    public void addArg(ASTNode arg)
+    {
+      args = args == null ? new ArrayList<ASTNode>() : args;
+      args.add(arg);
+    }
+
+    public PartitionedTableFunctionSpec getStartOfChain() {
+      if ( input instanceof PartitionedTableFunctionSpec ) {
+        return ((PartitionedTableFunctionSpec)input).getStartOfChain();
+      }
+      return this;
+    }
+    @Override
+    public String getQueryInputName() {
+      return input.getQueryInputName();
+    }
+    @Override
+    public PTFQueryInputSpec getQueryInput() {
+      return input.getQueryInput();
+    }
+  }
+
+  /*
+   * Captures how the Input to a PTF Function should be partitioned and
+   * ordered. Refers to a /Partition/ and /Order/ instance.
+   */
+  public static class PartitioningSpec {
+    PartitionSpec partSpec;
+    OrderSpec orderSpec;
+    public PartitionSpec getPartSpec() {
+      return partSpec;
+    }
+    public void setPartSpec(PartitionSpec partSpec) {
+      this.partSpec = partSpec;
+    }
+    public OrderSpec getOrderSpec() {
+      return orderSpec;
+    }
+    public void setOrderSpec(OrderSpec orderSpec) {
+      this.orderSpec = orderSpec;
+    }
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((orderSpec == null) ? 0 : orderSpec.hashCode());
+      result = prime * result + ((partSpec == null) ? 0 : partSpec.hashCode());
+      return result;
+    }
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      PartitioningSpec other = (PartitioningSpec) obj;
+      if (orderSpec == null) {
+        if (other.orderSpec != null) {
+          return false;
+        }
+      } else if (!orderSpec.equals(other.orderSpec)) {
+        return false;
+      }
+      if (partSpec == null) {
+        if (other.partSpec != null) {
+          return false;
+        }
+      } else if (!partSpec.equals(other.partSpec)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /*
+   * Captures how an Input should be Partitioned. This is captured as a
+   * list of ASTNodes that are the expressions in the Distribute/Cluster
+   * by clause specifying the partitioning applied for a PTF invocation.
+   */
+  public static class PartitionSpec {
+    ArrayList<PartitionExpression> expressions;
+
+    public ArrayList<PartitionExpression> getExpressions()
+    {
+      return expressions;
+    }
+
+    public void setExpressions(ArrayList<PartitionExpression> columns)
+    {
+      this.expressions = columns;
+    }
+
+    public void addExpression(PartitionExpression c)
+    {
+      expressions = expressions == null ? new ArrayList<PartitionExpression>() : expressions;
+      expressions.add(c);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((expressions == null) ? 0 : expressions.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      PartitionSpec other = (PartitionSpec) obj;
+      if (expressions == null)
+      {
+        if (other.expressions != null) {
+          return false;
+        }
+      }
+      else if (!expressions.equals(other.expressions)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("partitionColumns=%s",PTFUtils.toString(expressions));
+    }
+  }
+
+  public static class PartitionExpression
+  {
+    ASTNode expression;
+
+    public PartitionExpression() {}
+
+    public PartitionExpression(PartitionExpression peSpec)
+    {
+      expression = peSpec.getExpression();
+    }
+
+    public ASTNode getExpression() {
+      return expression;
+    }
+
+    public void setExpression(ASTNode expression) {
+      this.expression = expression;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((expression == null) ? 0 : expression.toStringTree().hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!getClass().isAssignableFrom(obj.getClass())) {
+        return false;
+      }
+      PartitionExpression other = (PartitionExpression) obj;
+      if (expression == null) {
+        if (other.expression != null) {
+          return false;
+        }
+      } else if (!expression.toStringTree().equals(other.expression.toStringTree())) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+    return expression.toStringTree();
+    }
+
+  }
+
+  /*
+   * Captures how the Input should be Ordered. This is captured as a list
+   * of ASTNodes that are the expressions in the Sort By clause in a
+   * PTF invocation.
+   */
+  public static class OrderSpec
+  {
+    ArrayList<OrderExpression> expressions;
+
+    public OrderSpec() {}
+
+    public OrderSpec(PartitionSpec pSpec)
+    {
+      for(PartitionExpression peSpec : pSpec.getExpressions())
+      {
+        addExpression(new OrderExpression(peSpec));
+      }
+    }
+
+    public ArrayList<OrderExpression> getExpressions()
+    {
+      return expressions;
+    }
+
+    public void setExpressions(ArrayList<OrderExpression> columns)
+    {
+      this.expressions = columns;
+    }
+
+    public void addExpression(OrderExpression c)
+    {
+      expressions = expressions == null ? new ArrayList<OrderExpression>() : expressions;
+      expressions.add(c);
+    }
+
+    protected boolean isPrefixedBy(PartitionSpec pSpec) {
+      if ( pSpec == null || pSpec.getExpressions() == null) {
+        return true;
+      }
+
+      int pExprCnt = pSpec.getExpressions().size();
+      int exprCnt = getExpressions() == null ? 0 : getExpressions().size();
+
+      if ( exprCnt < pExprCnt ) {
+        return false;
+      }
+
+      for(int i=0; i < pExprCnt; i++) {
+        if ( !pSpec.getExpressions().get(i).equals(getExpressions().get(i)) ) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    protected void prefixBy(PartitionSpec pSpec) {
+      if ( pSpec == null || pSpec.getExpressions() == null) {
+        return;
+      }
+      if ( expressions == null ) {
+        expressions = new ArrayList<PTFInvocationSpec.OrderExpression>();
+      }
+      for(int i = pSpec.getExpressions().size() - 1; i >= 0; i--) {
+        expressions.add(0, new OrderExpression(pSpec.getExpressions().get(i)));
+      }
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((expressions == null) ? 0 : expressions.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      OrderSpec other = (OrderSpec) obj;
+      if (expressions == null)
+      {
+        if (other.expressions != null) {
+          return false;
+        }
+      }
+      else if (!expressions.equals(other.expressions)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("orderColumns=%s",PTFUtils.toString(expressions));
+    }
+  }
+
+  public static enum Order
+  {
+    ASC,
+    DESC;
+  }
+
+  public static class OrderExpression extends PartitionExpression
+  {
+    Order order;
+
+    public OrderExpression() {}
+
+    public OrderExpression(PartitionExpression peSpec)
+    {
+      super(peSpec);
+      order = Order.ASC;
+    }
+
+    public Order getOrder()
+    {
+      return order;
+    }
+
+    public void setOrder(Order order)
+    {
+      this.order = order;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((order == null) ? 0 : order.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      OrderExpression other = (OrderExpression) obj;
+      if (order != other.order) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("%s %s", super.toString(), order);
+    }
+  }
+
+}