You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2017/03/28 00:12:10 UTC
[1/2] incubator-hawq git commit: [#140722851] PXF to use file level
stats for count(*) without condition.
Repository: incubator-hawq
Updated Branches:
refs/heads/140722851 [created] 478f0a891
[#140722851] PXF to use file level stats for count(*) without condition.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/73a20485
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/73a20485
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/73a20485
Branch: refs/heads/140722851
Commit: 73a204854f6400d31f5b26237ed8df9dfb087d91
Parents: 55d9e85
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Wed Mar 8 13:45:13 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Fri Mar 17 15:18:25 2017 -0700
----------------------------------------------------------------------
.../pxf/api/utilities/EnumAggregationType.java | 51 +++++++++++
.../pxf/api/utilities/FragmentMetadata.java | 59 +++++++++++++
.../hawq/pxf/api/utilities/InputData.java | 9 ++
.../hawq/pxf/api/utilities/Utilities.java | 25 ++++++
.../plugins/hdfs/HdfsAtomicDataAccessor.java | 2 +-
.../hdfs/HdfsSplittableDataAccessor.java | 2 +-
.../plugins/hdfs/utilities/HdfsUtilities.java | 26 +-----
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 69 ++++++++++++++-
.../plugins/hive/utilities/HiveUtilities.java | 23 +++++
pxf/pxf-service/src/main/.DS_Store | Bin 0 -> 6148 bytes
.../org/apache/hawq/pxf/service/AggBridge.java | 86 +++++++++++++++++++
.../hawq/pxf/service/rest/BridgeResource.java | 4 +-
.../pxf/service/utilities/ProtocolData.java | 5 ++
13 files changed, 333 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
new file mode 100644
index 0000000..ee38f18
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+public enum EnumAggregationType {
+
+ COUNT("count", true),
+ UNKNOWN("unknown", false);
+
+ private String aggOperationCode;
+ private boolean optimizationSupported;
+
+ EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
+ this.aggOperationCode = aggOperationCode;
+ this.optimizationSupported = optimizationSupported;
+ }
+
+ public String getAggOperationCode() {
+ return this.aggOperationCode;
+ }
+
+ public boolean isOptimizationSupported() {
+ return this.optimizationSupported;
+ }
+
+ public static EnumAggregationType getAggregationType(String aggOperationCode) {
+ for (EnumAggregationType at : values()) {
+ if (at.getAggOperationCode().equals(aggOperationCode)) {
+ return at;
+ }
+ }
+ return EnumAggregationType.UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
new file mode 100644
index 0000000..7a82065
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+public class FragmentMetadata {
+
+
+ private long start;
+ private long end;
+ private String[] hosts;
+
+ public FragmentMetadata(long start, long end, String[] hosts) {
+ this.start = start;
+ this.end = end;
+ this.hosts = hosts;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public String[] getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(String[] hosts) {
+ this.hosts = hosts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 9816fdc..7950ed3 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -52,6 +52,7 @@ public class InputData {
protected String remoteLogin;
protected String remoteSecret;
protected int dataFragment; /* should be deprecated */
+ private EnumAggregationType aggType;
/**
* When false the bridge has to run in synchronized mode. default value -
@@ -335,4 +336,12 @@ public class InputData {
return dataFragment;
}
+ public EnumAggregationType getAggType() {
+ return aggType;
+ }
+
+ public void setAggType(EnumAggregationType aggType) {
+ this.aggType = aggType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index 51326bc..f59a07a 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -20,9 +20,13 @@ package org.apache.hawq.pxf.api.utilities;
*/
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -151,4 +155,25 @@ public class Utilities {
}
return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
}
+
+ public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception {
+ byte[] serializedLocation = inputData.getFragmentMetadata();
+ if (serializedLocation == null) {
+ throw new IllegalArgumentException("Missing fragment location information");
+ }
+ try (ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedLocation);
+ ObjectInputStream objectStream = new ObjectInputStream(bytesStream)) {
+ long start = objectStream.readLong();
+ long end = objectStream.readLong();
+ String[] hosts = (String[]) objectStream.readObject();
+ LOG.debug("parsed file split: path " + inputData.getDataSource()
+ + ", start " + start + ", end " + end + ", hosts "
+ + ArrayUtils.toString(hosts));
+ FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts);
+ return fragmentMetadata;
+ } catch (Exception e) {
+ LOG.error("Unable to parse fragment metadata");
+ throw e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
index 178b774..a95248e 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -65,7 +65,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
// 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
conf = new Configuration();
- fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+ fileSplit = HdfsUtilities.parseFileSplit(inputData);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
index 0174bd8..b61d76a 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -75,7 +75,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements
@Override
public boolean openForRead() throws Exception {
LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
- FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+ FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
requestSplits.add(fileSplit);
// Initialize record reader based on current split
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
index c99ccd6..1aae838 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.avro.Schema;
@@ -30,7 +31,6 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -171,29 +171,11 @@ public class HdfsUtilities {
* @param inputData request input data
* @return FileSplit with fragment metadata
*/
- public static FileSplit parseFragmentMetadata(InputData inputData) {
+ public static FileSplit parseFileSplit(InputData inputData) {
try {
- byte[] serializedLocation = inputData.getFragmentMetadata();
- if (serializedLocation == null) {
- throw new IllegalArgumentException(
- "Missing fragment location information");
- }
-
- ByteArrayInputStream bytesStream = new ByteArrayInputStream(
- serializedLocation);
- ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-
- long start = objectStream.readLong();
- long end = objectStream.readLong();
-
- String[] hosts = (String[]) objectStream.readObject();
-
- FileSplit fileSplit = new FileSplit(new Path(
- inputData.getDataSource()), start, end, hosts);
+ FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
- LOG.debug("parsed file split: path " + inputData.getDataSource()
- + ", start " + start + ", end " + end + ", hosts "
- + ArrayUtils.toString(hosts));
+ FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()), fragmentMetadata.getStart(), fragmentMetadata.getEnd(), fragmentMetadata.getHosts());
return fileSplit;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 07348b0..0e2fc2a 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -22,17 +22,28 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hawq.pxf.api.BasicFilter;
import org.apache.hawq.pxf.api.LogicalFilter;
+import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.mapred.*;
+import java.io.IOException;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
@@ -53,6 +64,11 @@ public class HiveORCAccessor extends HiveAccessor {
private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
private final String SARG_PUSHDOWN = "sarg.pushdown";
+ protected Reader orcReader;
+
+ private boolean useStats;
+ private long count;
+ private long objectsEmitted;
/**
* Constructs a HiveORCFileAccessor.
@@ -65,13 +81,35 @@ public class HiveORCAccessor extends HiveAccessor {
HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
initPartitionFields(hiveUserData.getPartitionKeys());
filterInFragmenter = hiveUserData.isFilterInFragmenter();
+
+ if (inputData != null && !inputData.hasFilter() && inputData.getAggType() != null && inputData.getAggType().isOptimizationSupported()) {
+ useStats = true;
+ }
+ }
+
+ private void retrieveStats() throws Exception {
+ FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
+ /* We are using file-level stats therefore if file has multiple splits,
+ * it's enough to return count for a first split in file*/
+ if (fragmentMetadata.getStart() == 0)
+ this.count = this.orcReader.getNumberOfRows();
}
@Override
public boolean openForRead() throws Exception {
- addColumns();
- addFilters();
- return super.openForRead();
+ if (useStats) {
+ orcReader = HiveUtilities.getOrcReader(inputData);
+ if (orcReader == null) {
+ return false;
+ }
+ retrieveStats();
+ objectsEmitted = 0;
+ return true;
+ } else {
+ addColumns();
+ addFilters();
+ return super.openForRead();
+ }
}
/**
@@ -213,4 +251,29 @@ public class HiveORCAccessor extends HiveAccessor {
return true;
}
+ @Override
+ public OneRow readNextObject() throws IOException {
+ if (useStats)
+ return emitAggObject();
+ else
+ return super.readNextObject();
+ }
+
+ private OneRow emitAggObject() {
+ OneRow row = null;
+ switch (inputData.getAggType()) {
+ case COUNT:
+ if (objectsEmitted < count) {
+ objectsEmitted++;
+ row = new OneRow(key, data);
+ }
+ break;
+ default: {
+ throw new UnsupportedOperationException(
+ "Aggregation operation is not supoorted.");
+ }
+ }
+ return row;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index 3328c9f..6d33283 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -30,6 +30,8 @@ import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
@@ -45,6 +47,7 @@ import org.apache.hawq.pxf.api.Metadata.Field;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.api.io.DataType;
@@ -57,6 +60,12 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
import org.apache.hawq.pxf.plugins.hive.HiveUserData;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.ReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.orc.StripeStatistics;
/**
* Class containing helper functions connecting
@@ -627,4 +636,18 @@ public class HiveUtilities {
return deserializer;
}
+
+ public static Reader getOrcReader(InputData inputData) {
+ try {
+
+ Path path = new Path(inputData.getDataSource());
+ Reader reader = OrcFile.createReader(path.getFileSystem(new Configuration()), path);
+
+ return reader;
+
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Exception while getting orc reader", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/.DS_Store
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/.DS_Store b/pxf/pxf-service/src/main/.DS_Store
new file mode 100644
index 0000000..db3318c
Binary files /dev/null and b/pxf/pxf-service/src/main/.DS_Store differ
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
new file mode 100644
index 0000000..d274864
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.collections.map.LRUMap;
+
+public class AggBridge extends ReadBridge implements Bridge {
+
+ private static final Log LOG = LogFactory.getLog(AggBridge.class);
+ private LRUMap resolvedFieldsCache;
+
+ public AggBridge(ProtocolData protData) throws Exception {
+ super(protData);
+ }
+
+ @Override
+ public boolean beginIteration() throws Exception {
+ /* Initialize LRU cache with 100 items*/
+ resolvedFieldsCache = new LRUMap();
+ return super.fileAccessor.openForRead();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Writable getNext() throws Exception {
+ Writable output = null;
+ List<OneField> resolvedFields = null;
+ OneRow onerow = null;
+
+ if (!outputQueue.isEmpty()) {
+ return outputQueue.pop();
+ }
+
+ try {
+ while (outputQueue.isEmpty()) {
+ onerow = fileAccessor.readNextObject();
+ if (onerow == null) {
+ break;
+ }
+ resolvedFields = (List<OneField>) resolvedFieldsCache.get(onerow.getKey());
+ if (resolvedFields == null) {
+ resolvedFields = fieldsResolver.getFields(onerow);
+ resolvedFieldsCache.put(onerow.getKey(), resolvedFields);
+ }
+ outputQueue = outputBuilder.makeOutput(resolvedFields);
+ if (!outputQueue.isEmpty()) {
+ output = outputQueue.pop();
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ throw ex;
+ }
+
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 104f353..39dc985 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -39,7 +39,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.catalina.connector.ClientAbortException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import org.apache.hawq.pxf.service.AggBridge;
import org.apache.hawq.pxf.service.Bridge;
import org.apache.hawq.pxf.service.ReadBridge;
import org.apache.hawq.pxf.service.ReadSamplingBridge;
@@ -98,6 +98,8 @@ public class BridgeResource extends RestResource {
float sampleRatio = protData.getStatsSampleRatio();
if (sampleRatio > 0) {
bridge = new ReadSamplingBridge(protData);
+ } else if (protData.getAggType().isOptimizationSupported()) {
+ bridge = new AggBridge(protData);
} else {
bridge = new ReadBridge(protData);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index dc2a110..6f21068 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.ProfilesConf;
@@ -115,6 +116,10 @@ public class ProtocolData extends InputData {
// Store alignment for global use as a system property
System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
+
+ String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
+
+ this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));
}
/**
[2/2] incubator-hawq git commit: [#140722851] Applied code-review
feedback.
Posted by od...@apache.org.
[#140722851] Applied code-review feedback.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/478f0a89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/478f0a89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/478f0a89
Branch: refs/heads/140722851
Commit: 478f0a89163554a835bf3963b4a7e2df3341fe34
Parents: 73a2048
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Mon Mar 27 17:11:47 2017 -0700
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Mon Mar 27 17:11:47 2017 -0700
----------------------------------------------------------------------
.../org/apache/hawq/pxf/api/StatsAccessor.java | 40 ++++++++++++++++++
.../pxf/api/utilities/EnumAggregationType.java | 2 +-
.../pxf/api/utilities/FragmentMetadata.java | 17 +++++++-
.../hawq/pxf/api/utilities/Utilities.java | 44 ++++++++++++++++++--
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 42 ++++++++++---------
.../org/apache/hawq/pxf/service/AggBridge.java | 5 +++
.../pxf/service/utilities/ProtocolData.java | 1 +
7 files changed, 127 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
new file mode 100644
index 0000000..724448b
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api;
+
+import org.apache.hawq.pxf.api.OneRow;
+
+/**
+ * Interface of accessor which can leverage statistic information for aggregate queries
+ *
+ */
+public interface StatsAccessor {
+
+ /**
+ * Method which reads needed statistics for current split
+ */
+ public void retrieveStats() throws Exception;
+
+ /**
+ * Returns next tuple based on statistics information without actual reading of data
+ */
+ public OneRow emitAggObject();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
index ee38f18..69716c6 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -27,7 +27,7 @@ public enum EnumAggregationType {
private String aggOperationCode;
private boolean optimizationSupported;
- EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
+ private EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
this.aggOperationCode = aggOperationCode;
this.optimizationSupported = optimizationSupported;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
index 7a82065..7f266ec 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -19,9 +19,12 @@
package org.apache.hawq.pxf.api.utilities;
+/**
+ * Class which holds metadata of a file split and locality information.
+ *
+ */
public class FragmentMetadata {
-
private long start;
private long end;
private String[] hosts;
@@ -32,6 +35,10 @@ public class FragmentMetadata {
this.hosts = hosts;
}
+ /**
+ *
+ * @return position in bytes where given data fragment starts
+ */
public long getStart() {
return start;
}
@@ -40,6 +47,10 @@ public class FragmentMetadata {
this.start = start;
}
+ /**
+ *
+ * @return position in bytes where given data fragment ends
+ */
public long getEnd() {
return end;
}
@@ -48,6 +59,10 @@ public class FragmentMetadata {
this.end = end;
}
+ /**
+ *
+ * @return all hosts which has given data fragment
+ */
public String[] getHosts() {
return hosts;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index f59a07a..f948888 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -156,6 +158,14 @@ public class Utilities {
return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
}
+ /**
+ * Parses input data and returns fragment metadata.
+ *
+ * @param inputData input data which has protocol information
+ * @return fragment metadata
+ * @throws IllegalArgumentException if fragment metadata information wasn't found in input data
+ * @throws Exception
+ */
public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception {
byte[] serializedLocation = inputData.getFragmentMetadata();
if (serializedLocation == null) {
@@ -166,9 +176,18 @@ public class Utilities {
long start = objectStream.readLong();
long end = objectStream.readLong();
String[] hosts = (String[]) objectStream.readObject();
- LOG.debug("parsed file split: path " + inputData.getDataSource()
- + ", start " + start + ", end " + end + ", hosts "
- + ArrayUtils.toString(hosts));
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("parsed file split: path ");
+ sb.append(inputData.getDataSource());
+ sb.append(", start ");
+ sb.append(start);
+ sb.append(", end ");
+ sb.append(end);
+ sb.append(", hosts ");
+ sb.append(ArrayUtils.toString(hosts));
+ LOG.debug(sb.toString());
+ }
FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts);
return fragmentMetadata;
} catch (Exception e) {
@@ -176,4 +195,23 @@ public class Utilities {
throw e;
}
}
+
+
+ /**
+ * Determines whether accessor should use statistics to optimize reading results
+ *
+ * @param accessor accessor instance
+ * @param inputData input data which has protocol information
+ * @return true if this accessor should use statistic information
+ */
+ public static boolean useStats(ReadAccessor accessor, InputData inputData) {
+ if (accessor instanceof StatsAccessor) {
+ if (inputData != null && !inputData.hasFilter()
+ && inputData.getAggType() != null
+ && inputData.getAggType().isOptimizationSupported()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 0e2fc2a..24b355c 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -22,7 +22,6 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -31,16 +30,14 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hawq.pxf.api.BasicFilter;
import org.apache.hawq.pxf.api.LogicalFilter;
import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
-import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
@@ -56,7 +53,7 @@ import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_
* This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files.
* Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
*/
-public class HiveORCAccessor extends HiveAccessor {
+public class HiveORCAccessor extends HiveAccessor implements StatsAccessor {
private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class);
@@ -81,18 +78,7 @@ public class HiveORCAccessor extends HiveAccessor {
HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
initPartitionFields(hiveUserData.getPartitionKeys());
filterInFragmenter = hiveUserData.isFilterInFragmenter();
-
- if (inputData != null && !inputData.hasFilter() && inputData.getAggType() != null && inputData.getAggType().isOptimizationSupported()) {
- useStats = true;
- }
- }
-
- private void retrieveStats() throws Exception {
- FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
- /* We are using file-level stats therefore if file has multiple splits,
- * it's enough to return count for a first split in file*/
- if (fragmentMetadata.getStart() == 0)
- this.count = this.orcReader.getNumberOfRows();
+ useStats = Utilities.useStats(this, inputData);
}
@Override
@@ -104,7 +90,7 @@ public class HiveORCAccessor extends HiveAccessor {
}
retrieveStats();
objectsEmitted = 0;
- return true;
+ return super.openForRead();
} else {
addColumns();
addFilters();
@@ -259,7 +245,25 @@ public class HiveORCAccessor extends HiveAccessor {
return super.readNextObject();
}
- private OneRow emitAggObject() {
+ /**
+ * Fetches file-level statistics from an ORC file.
+ */
+ @Override
+ public void retrieveStats() throws Exception {
+ FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
+ /*
+ * We are using file-level stats therefore if file has multiple splits,
+ * it's enough to return count for a first split in file
+ */
+ if (fragmentMetadata.getStart() == 0)
+ this.count = this.orcReader.getNumberOfRows();
+ }
+
+ /**
+ * Emits tuple without reading from disk, currently supports COUNT
+ */
+ @Override
+ public OneRow emitAggObject() {
OneRow row = null;
switch (inputData.getAggType()) {
case COUNT:
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
index d274864..12f44e2 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -32,9 +32,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.collections.map.LRUMap;
+/**
+ * Bridge class optimized for aggregate queries.
+ *
+ */
public class AggBridge extends ReadBridge implements Bridge {
private static final Log LOG = LogFactory.getLog(AggBridge.class);
+ /* Avoid resolving rows with the same key twice */
private LRUMap resolvedFieldsCache;
public AggBridge(ProtocolData protData) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 6f21068..0de356b 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -117,6 +117,7 @@ public class ProtocolData extends InputData {
// Store alignment for global use as a system property
System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
+ //Get aggregation operation
String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));