You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2017/03/28 00:12:10 UTC

[1/2] incubator-hawq git commit: [#140722851] PXF to use file level stats for count(*) without condition.

Repository: incubator-hawq
Updated Branches:
  refs/heads/140722851 [created] 478f0a891


[#140722851] PXF to use file level stats for count(*) without condition.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/73a20485
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/73a20485
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/73a20485

Branch: refs/heads/140722851
Commit: 73a204854f6400d31f5b26237ed8df9dfb087d91
Parents: 55d9e85
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Wed Mar 8 13:45:13 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Fri Mar 17 15:18:25 2017 -0700

----------------------------------------------------------------------
 .../pxf/api/utilities/EnumAggregationType.java  |  51 +++++++++++
 .../pxf/api/utilities/FragmentMetadata.java     |  59 +++++++++++++
 .../hawq/pxf/api/utilities/InputData.java       |   9 ++
 .../hawq/pxf/api/utilities/Utilities.java       |  25 ++++++
 .../plugins/hdfs/HdfsAtomicDataAccessor.java    |   2 +-
 .../hdfs/HdfsSplittableDataAccessor.java        |   2 +-
 .../plugins/hdfs/utilities/HdfsUtilities.java   |  26 +-----
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  |  69 ++++++++++++++-
 .../plugins/hive/utilities/HiveUtilities.java   |  23 +++++
 pxf/pxf-service/src/main/.DS_Store              | Bin 0 -> 6148 bytes
 .../org/apache/hawq/pxf/service/AggBridge.java  |  86 +++++++++++++++++++
 .../hawq/pxf/service/rest/BridgeResource.java   |   4 +-
 .../pxf/service/utilities/ProtocolData.java     |   5 ++
 13 files changed, 333 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
new file mode 100644
index 0000000..ee38f18
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+public enum EnumAggregationType {
+
+    COUNT("count", true),
+    UNKNOWN("unknown", false);
+
+    private String aggOperationCode;
+    private boolean optimizationSupported;
+
+    EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
+        this.aggOperationCode = aggOperationCode;
+        this.optimizationSupported = optimizationSupported;
+    }
+
+    public String getAggOperationCode() {
+        return this.aggOperationCode;
+    }
+
+    public boolean isOptimizationSupported() {
+        return this.optimizationSupported;
+    }
+
+    public static EnumAggregationType getAggregationType(String aggOperationCode) {
+        for (EnumAggregationType at : values()) {
+            if (at.getAggOperationCode().equals(aggOperationCode)) {
+                return at;
+            }
+        }
+        return EnumAggregationType.UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
new file mode 100644
index 0000000..7a82065
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+public class FragmentMetadata {
+
+
+    private long start;
+    private long end;
+    private String[] hosts;
+
+    public FragmentMetadata(long start, long end, String[] hosts) {
+        this.start = start;
+        this.end = end;
+        this.hosts = hosts;
+    }
+
+    public long getStart() {
+        return start;
+    }
+
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+
+    public String[] getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String[] hosts) {
+        this.hosts = hosts;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 9816fdc..7950ed3 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -52,6 +52,7 @@ public class InputData {
     protected String remoteLogin;
     protected String remoteSecret;
     protected int dataFragment; /* should be deprecated */
+    private EnumAggregationType aggType;
 
     /**
      * When false the bridge has to run in synchronized mode. default value -
@@ -335,4 +336,12 @@ public class InputData {
         return dataFragment;
     }
 
+    public EnumAggregationType getAggType() {
+        return aggType;
+    }
+
+    public void setAggType(EnumAggregationType aggType) {
+        this.aggType = aggType;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index 51326bc..f59a07a 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -20,9 +20,13 @@ package org.apache.hawq.pxf.api.utilities;
  */
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
@@ -151,4 +155,25 @@ public class Utilities {
         }
         return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
     }
+
+    public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception {
+        byte[] serializedLocation = inputData.getFragmentMetadata();
+        if (serializedLocation == null) {
+            throw new IllegalArgumentException("Missing fragment location information");
+        }
+        try (ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedLocation);
+                ObjectInputStream objectStream = new ObjectInputStream(bytesStream)) {
+            long start = objectStream.readLong();
+            long end = objectStream.readLong();
+            String[] hosts = (String[]) objectStream.readObject();
+            LOG.debug("parsed file split: path " + inputData.getDataSource()
+                    + ", start " + start + ", end " + end + ", hosts "
+                    + ArrayUtils.toString(hosts));
+            FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts);
+            return fragmentMetadata;
+        } catch (Exception e) {
+            LOG.error("Unable to parse fragment metadata");
+            throw e;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
index 178b774..a95248e 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -65,7 +65,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
         // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
         conf = new Configuration();
 
-        fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+        fileSplit = HdfsUtilities.parseFileSplit(inputData);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
index 0174bd8..b61d76a 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -75,7 +75,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements
     @Override
     public boolean openForRead() throws Exception {
         LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
-        FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+        FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
         requestSplits.add(fileSplit);
 
         // Initialize record reader based on current split

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
index c99ccd6..1aae838 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities;
 
 import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.avro.Schema;
@@ -30,7 +31,6 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -171,29 +171,11 @@ public class HdfsUtilities {
      * @param inputData request input data
      * @return FileSplit with fragment metadata
      */
-    public static FileSplit parseFragmentMetadata(InputData inputData) {
+    public static FileSplit parseFileSplit(InputData inputData) {
         try {
-            byte[] serializedLocation = inputData.getFragmentMetadata();
-            if (serializedLocation == null) {
-                throw new IllegalArgumentException(
-                        "Missing fragment location information");
-            }
-
-            ByteArrayInputStream bytesStream = new ByteArrayInputStream(
-                    serializedLocation);
-            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-
-            long start = objectStream.readLong();
-            long end = objectStream.readLong();
-
-            String[] hosts = (String[]) objectStream.readObject();
-
-            FileSplit fileSplit = new FileSplit(new Path(
-                    inputData.getDataSource()), start, end, hosts);
+            FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
 
-            LOG.debug("parsed file split: path " + inputData.getDataSource()
-                    + ", start " + start + ", end " + end + ", hosts "
-                    + ArrayUtils.toString(hosts));
+            FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()), fragmentMetadata.getStart(), fragmentMetadata.getEnd(), fragmentMetadata.getHosts());
 
             return fileSplit;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 07348b0..0e2fc2a 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -22,17 +22,28 @@ package org.apache.hawq.pxf.plugins.hive;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hawq.pxf.api.BasicFilter;
 import org.apache.hawq.pxf.api.LogicalFilter;
+import org.apache.hawq.pxf.api.OneRow;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.mapred.*;
 
+import java.io.IOException;
 import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,6 +64,11 @@ public class HiveORCAccessor extends HiveAccessor {
     private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
     private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
     private final String SARG_PUSHDOWN = "sarg.pushdown";
+    protected Reader orcReader;
+
+    private boolean useStats;
+    private long count;
+    private long objectsEmitted;
 
     /**
      * Constructs a HiveORCFileAccessor.
@@ -65,13 +81,35 @@ public class HiveORCAccessor extends HiveAccessor {
         HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
         initPartitionFields(hiveUserData.getPartitionKeys());
         filterInFragmenter = hiveUserData.isFilterInFragmenter();
+
+        if (inputData != null && !inputData.hasFilter() && inputData.getAggType() != null && inputData.getAggType().isOptimizationSupported()) {
+            useStats = true;
+        }
+    }
+
+    private void retrieveStats() throws Exception {
+        FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
+        /* We are using file-level stats therefore if file has multiple splits,
+         * it's enough to return count for a first split in file*/
+        if (fragmentMetadata.getStart() == 0)
+            this.count = this.orcReader.getNumberOfRows();
     }
 
     @Override
     public boolean openForRead() throws Exception {
-        addColumns();
-        addFilters();
-        return super.openForRead();
+        if (useStats) {
+            orcReader = HiveUtilities.getOrcReader(inputData);
+            if (orcReader == null) {
+                return false;
+            }
+            retrieveStats();
+            objectsEmitted = 0;
+            return true;
+        } else {
+            addColumns();
+            addFilters();
+            return super.openForRead();
+        }
     }
 
     /**
@@ -213,4 +251,29 @@ public class HiveORCAccessor extends HiveAccessor {
         return true;
     }
 
+    @Override
+    public OneRow readNextObject() throws IOException {
+        if (useStats)
+            return emitAggObject();
+        else
+            return super.readNextObject();
+    }
+
+    private OneRow emitAggObject() {
+        OneRow row = null;
+        switch (inputData.getAggType()) {
+            case COUNT:
+                if (objectsEmitted < count) {
+                    objectsEmitted++;
+                    row = new OneRow(key, data);
+                }
+                break;
+            default: {
+                throw new UnsupportedOperationException(
+                        "Aggregation operation is not supoorted.");
+            }
+        }
+        return row;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index 3328c9f..6d33283 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -30,6 +30,8 @@ import java.util.Properties;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -45,6 +47,7 @@ import org.apache.hawq.pxf.api.Metadata.Field;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.UserDataException;
 import org.apache.hawq.pxf.api.utilities.EnumHawqType;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.api.io.DataType;
@@ -57,6 +60,12 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
 import org.apache.hawq.pxf.plugins.hive.HiveUserData;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.ReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.orc.StripeStatistics;
 
 /**
  * Class containing helper functions connecting
@@ -627,4 +636,18 @@ public class HiveUtilities {
 
         return deserializer;
     }
+
+    public static Reader getOrcReader(InputData inputData) {
+        try {
+
+            Path path = new Path(inputData.getDataSource());
+            Reader reader = OrcFile.createReader(path.getFileSystem(new Configuration()), path);
+
+            return reader;
+
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Exception while getting orc reader", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/.DS_Store
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/.DS_Store b/pxf/pxf-service/src/main/.DS_Store
new file mode 100644
index 0000000..db3318c
Binary files /dev/null and b/pxf/pxf-service/src/main/.DS_Store differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
new file mode 100644
index 0000000..d274864
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.collections.map.LRUMap;
+
+public class AggBridge extends ReadBridge implements Bridge {
+
+    private static final Log LOG = LogFactory.getLog(AggBridge.class);
+    private LRUMap resolvedFieldsCache;
+
+    public AggBridge(ProtocolData protData) throws Exception {
+        super(protData);
+    }
+
+    @Override
+    public boolean beginIteration() throws Exception {
+        /* Initialize LRU cache with 100 items*/
+        resolvedFieldsCache = new LRUMap();
+        return super.fileAccessor.openForRead();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Writable getNext() throws Exception {
+        Writable output = null;
+        List<OneField> resolvedFields = null;
+        OneRow onerow = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
+        try {
+            while (outputQueue.isEmpty()) {
+                onerow = fileAccessor.readNextObject();
+                if (onerow == null) {
+                    break;
+                }
+                resolvedFields = (List<OneField>) resolvedFieldsCache.get(onerow.getKey());
+                if (resolvedFields == null) {
+                    resolvedFields = fieldsResolver.getFields(onerow);
+                    resolvedFieldsCache.put(onerow.getKey(), resolvedFields);
+                }
+                outputQueue = outputBuilder.makeOutput(resolvedFields);
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
+        } catch (Exception ex) {
+            throw ex;
+        }
+
+        return output;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 104f353..39dc985 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -39,7 +39,7 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.catalina.connector.ClientAbortException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import org.apache.hawq.pxf.service.AggBridge;
 import org.apache.hawq.pxf.service.Bridge;
 import org.apache.hawq.pxf.service.ReadBridge;
 import org.apache.hawq.pxf.service.ReadSamplingBridge;
@@ -98,6 +98,8 @@ public class BridgeResource extends RestResource {
         float sampleRatio = protData.getStatsSampleRatio();
         if (sampleRatio > 0) {
             bridge = new ReadSamplingBridge(protData);
+        } else if (protData.getAggType().isOptimizationSupported()) {
+            bridge = new AggBridge(protData);
         } else {
             bridge = new ReadBridge(protData);
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index dc2a110..6f21068 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.ProfilesConf;
 
@@ -115,6 +116,10 @@ public class ProtocolData extends InputData {
 
         // Store alignment for global use as a system property
         System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
+
+        String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
+
+        this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));
     }
 
     /**


[2/2] incubator-hawq git commit: [#140722851] Applied code-review feedback.

Posted by od...@apache.org.
[#140722851] Applied code-review feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/478f0a89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/478f0a89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/478f0a89

Branch: refs/heads/140722851
Commit: 478f0a89163554a835bf3963b4a7e2df3341fe34
Parents: 73a2048
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Mon Mar 27 17:11:47 2017 -0700
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Mon Mar 27 17:11:47 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hawq/pxf/api/StatsAccessor.java  | 40 ++++++++++++++++++
 .../pxf/api/utilities/EnumAggregationType.java  |  2 +-
 .../pxf/api/utilities/FragmentMetadata.java     | 17 +++++++-
 .../hawq/pxf/api/utilities/Utilities.java       | 44 ++++++++++++++++++--
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  | 42 ++++++++++---------
 .../org/apache/hawq/pxf/service/AggBridge.java  |  5 +++
 .../pxf/service/utilities/ProtocolData.java     |  1 +
 7 files changed, 127 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
new file mode 100644
index 0000000..724448b
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api;
+
+import org.apache.hawq.pxf.api.OneRow;
+
+/**
+ * Interface of accessor which can leverage statistic information for aggregate queries
+ *
+ */
+public interface StatsAccessor {
+
+    /**
+     * Method which reads needed statistics for current split
+     */
+    public void retrieveStats() throws Exception;
+
+    /**
+     * Returns next tuple based on statistics information without actual reading of data
+     */
+    public OneRow emitAggObject();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
index ee38f18..69716c6 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -27,7 +27,7 @@ public enum EnumAggregationType {
     private String aggOperationCode;
     private boolean optimizationSupported;
 
-    EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
+    private EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
         this.aggOperationCode = aggOperationCode;
         this.optimizationSupported = optimizationSupported;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
index 7a82065..7f266ec 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -19,9 +19,12 @@
 
 package org.apache.hawq.pxf.api.utilities;
 
+/**
+ * Class which holds metadata of a file split and locality information.
+ *
+ */
 public class FragmentMetadata {
 
-
     private long start;
     private long end;
     private String[] hosts;
@@ -32,6 +35,10 @@ public class FragmentMetadata {
         this.hosts = hosts;
     }
 
+    /**
+     * 
+     * @return position in bytes where given data fragment starts
+     */
     public long getStart() {
         return start;
     }
@@ -40,6 +47,10 @@ public class FragmentMetadata {
         this.start = start;
     }
 
+    /**
+     * 
+     * @return position in bytes where given data fragment ends
+     */
     public long getEnd() {
         return end;
     }
@@ -48,6 +59,10 @@ public class FragmentMetadata {
         this.end = end;
     }
 
+    /**
+     * 
+     * @return all hosts which has given data fragment
+     */
     public String[] getHosts() {
         return hosts;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index f59a07a..f948888 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -156,6 +158,14 @@ public class Utilities {
         return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
     }
 
+    /**
+     * Parses input data and returns fragment metadata.
+     * 
+     * @param inputData input data which has protocol information
+     * @return fragment metadata
+     * @throws IllegalArgumentException if fragment metadata information wasn't found in input data
+     * @throws Exception
+     */
     public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception {
         byte[] serializedLocation = inputData.getFragmentMetadata();
         if (serializedLocation == null) {
@@ -166,9 +176,18 @@ public class Utilities {
             long start = objectStream.readLong();
             long end = objectStream.readLong();
             String[] hosts = (String[]) objectStream.readObject();
-            LOG.debug("parsed file split: path " + inputData.getDataSource()
-                    + ", start " + start + ", end " + end + ", hosts "
-                    + ArrayUtils.toString(hosts));
+            if (LOG.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("parsed file split: path ");
+                sb.append(inputData.getDataSource());
+                sb.append(", start ");
+                sb.append(start);
+                sb.append(", end ");
+                sb.append(end);
+                sb.append(", hosts ");
+                sb.append(ArrayUtils.toString(hosts));
+                LOG.debug(sb.toString());
+            }
             FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts);
             return fragmentMetadata;
         } catch (Exception e) {
@@ -176,4 +195,23 @@ public class Utilities {
             throw e;
         }
     }
+
+
+    /**
+     * Determines whether accessor should use statistics to optimize reading results
+     * 
+     * @param accessor accessor instance
+     * @param inputData input data which has protocol information
+     * @return true if this accessor should use statistic information
+     */
+    public static boolean useStats(ReadAccessor accessor, InputData inputData) {
+        if (accessor instanceof StatsAccessor) {
+            if (inputData != null && !inputData.hasFilter()
+                    && inputData.getAggType() != null
+                    && inputData.getAggType().isOptimizationSupported()) {
+                return true;
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 0e2fc2a..24b355c 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -22,7 +22,6 @@ package org.apache.hawq.pxf.plugins.hive;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -31,16 +30,14 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hawq.pxf.api.BasicFilter;
 import org.apache.hawq.pxf.api.LogicalFilter;
 import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
 import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
-import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
 import org.apache.hadoop.mapred.*;
 
 import java.io.IOException;
@@ -56,7 +53,7 @@ import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_
  * This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files.
  * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
  */
-public class HiveORCAccessor extends HiveAccessor {
+public class HiveORCAccessor extends HiveAccessor implements StatsAccessor {
 
     private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class);
 
@@ -81,18 +78,7 @@ public class HiveORCAccessor extends HiveAccessor {
         HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
         initPartitionFields(hiveUserData.getPartitionKeys());
         filterInFragmenter = hiveUserData.isFilterInFragmenter();
-
-        if (inputData != null && !inputData.hasFilter() && inputData.getAggType() != null && inputData.getAggType().isOptimizationSupported()) {
-            useStats = true;
-        }
-    }
-
-    private void retrieveStats() throws Exception {
-        FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
-        /* We are using file-level stats therefore if file has multiple splits,
-         * it's enough to return count for a first split in file*/
-        if (fragmentMetadata.getStart() == 0)
-            this.count = this.orcReader.getNumberOfRows();
+        useStats = Utilities.useStats(this, inputData);
     }
 
     @Override
@@ -104,7 +90,7 @@ public class HiveORCAccessor extends HiveAccessor {
             }
             retrieveStats();
             objectsEmitted = 0;
-            return true;
+            return super.openForRead();
         } else {
             addColumns();
             addFilters();
@@ -259,7 +245,25 @@ public class HiveORCAccessor extends HiveAccessor {
             return super.readNextObject();
     }
 
-    private OneRow emitAggObject() {
+    /**
+     * Fetches file-level statistics from an ORC file.
+     */
+    @Override
+    public void retrieveStats() throws Exception {
+        FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
+        /*
+         * We are using file-level stats therefore if file has multiple splits,
+         * it's enough to return count for a first split in file
+         */
+        if (fragmentMetadata.getStart() == 0)
+            this.count = this.orcReader.getNumberOfRows();
+    }
+
+    /**
+     * Emits tuple without reading from disk, currently supports COUNT
+     */
+    @Override
+    public OneRow emitAggObject() {
         OneRow row = null;
         switch (inputData.getAggType()) {
             case COUNT:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
index d274864..12f44e2 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -32,9 +32,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.collections.map.LRUMap;
 
+/**
+ * Bridge class optimized for aggregate queries.
+ *
+ */
 public class AggBridge extends ReadBridge implements Bridge {
 
     private static final Log LOG = LogFactory.getLog(AggBridge.class);
+    /* Avoid resolving rows with the same key twice */
     private LRUMap resolvedFieldsCache;
 
     public AggBridge(ProtocolData protData) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 6f21068..0de356b 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -117,6 +117,7 @@ public class ProtocolData extends InputData {
         // Store alignment for global use as a system property
         System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
 
+        //Get aggregation operation
         String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
 
         this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));