You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2017/04/04 09:32:46 UTC
incubator-hawq git commit: HAWQ-1404. PXF to leverage file-level
stats of ORC file and emit records for COUNT(*).
Repository: incubator-hawq
Updated Branches:
refs/heads/master be0547200 -> f5ffddf26
HAWQ-1404. PXF to leverage file-level stats of ORC file and emit records for COUNT(*).
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f5ffddf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f5ffddf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f5ffddf2
Branch: refs/heads/master
Commit: f5ffddf26ddc990d616c905f883c383e7f1c8542
Parents: be05472
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Tue Apr 4 02:32:28 2017 -0700
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Tue Apr 4 02:32:28 2017 -0700
----------------------------------------------------------------------
.../org/apache/hawq/pxf/api/StatsAccessor.java | 40 +++++++++
.../pxf/api/utilities/EnumAggregationType.java | 50 +++++++++++
.../pxf/api/utilities/FragmentMetadata.java | 86 ++++++++++++++++++
.../hawq/pxf/api/utilities/InputData.java | 34 +++++++
.../hawq/pxf/api/utilities/Utilities.java | 80 +++++++++++++++++
.../hawq/pxf/api/utilities/UtilitiesTest.java | 91 +++++++++++++++++++
.../plugins/hdfs/HdfsAtomicDataAccessor.java | 2 +-
.../hdfs/HdfsSplittableDataAccessor.java | 2 +-
.../plugins/hdfs/utilities/HdfsUtilities.java | 26 +-----
.../hdfs/utilities/HdfsUtilitiesTest.java | 21 +++++
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 77 +++++++++++++++-
.../plugins/hive/utilities/HiveUtilities.java | 27 +++++-
.../pxf/plugins/hive/HiveORCAccessorTest.java | 13 +++
.../org/apache/hawq/pxf/service/AggBridge.java | 95 ++++++++++++++++++++
.../hawq/pxf/service/rest/BridgeResource.java | 5 +-
.../pxf/service/utilities/ProtocolData.java | 13 +++
src/bin/gpfusion/gpbridgeapi.c | 1 +
17 files changed, 632 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
new file mode 100644
index 0000000..7ecdf52
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api;
+
+import org.apache.hawq.pxf.api.OneRow;
+
+/**
+ * Interface of accessor which can leverage statistic information for aggregate queries
+ *
+ */
+public interface StatsAccessor extends ReadAccessor {
+
+ /**
+ * Method which reads needed statistics for current split
+ */
+ public void retrieveStats() throws Exception;
+
+ /**
+ * Returns next tuple based on statistics information without actual reading of data
+ */
+ public OneRow emitAggObject();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
new file mode 100644
index 0000000..5557c49
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+public enum EnumAggregationType {
+
+ COUNT("count", true);
+
+ private String aggOperationCode;
+ private boolean optimizationSupported;
+
+ private EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
+ this.aggOperationCode = aggOperationCode;
+ this.optimizationSupported = optimizationSupported;
+ }
+
+ public String getAggOperationCode() {
+ return this.aggOperationCode;
+ }
+
+ public boolean isOptimizationSupported() {
+ return this.optimizationSupported;
+ }
+
+ public static EnumAggregationType getAggregationType(String aggOperationCode) {
+ for (EnumAggregationType at : values()) {
+ if (at.getAggOperationCode().equals(aggOperationCode)) {
+ return at;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
new file mode 100644
index 0000000..7b0a3c4
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+/**
+ * Class which holds metadata of a file split and locality information.
+ *
+ */
+public class FragmentMetadata {
+
+ private long start;
+ private long end;
+ private String[] hosts;
+
+ public FragmentMetadata(long start, long end, String[] hosts) {
+ this.start = start;
+ this.end = end;
+ this.hosts = hosts;
+ }
+
+ /**
+ * Returns start position of a fragment
+ * @return position in bytes where given data fragment starts
+ */
+ public long getStart() {
+ return start;
+ }
+
+ /**
+ * Sets start position of a fragment
+ * @param start start position
+ */
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ /**
+ * Returns end positoon of a fragment
+ * @return position in bytes where given data fragment ends
+ */
+ public long getEnd() {
+ return end;
+ }
+
+ /**
+ * Sets end position of a fragment
+ * @param end end position
+ */
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ /**
+ * Returns all hosts which have given data fragment
+ * @return all hosts which have given data fragment
+ */
+ public String[] getHosts() {
+ return hosts;
+ }
+
+ /**
+ * Sets hosts for a given fragment
+ * @param hosts hosts which have given fragment
+ */
+ public void setHosts(String[] hosts) {
+ this.hosts = hosts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 9816fdc..959cda6 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -52,6 +52,8 @@ public class InputData {
protected String remoteLogin;
protected String remoteSecret;
protected int dataFragment; /* should be deprecated */
+ private EnumAggregationType aggType;
+ private int fragmentIndex;
/**
* When false the bridge has to run in synchronized mode. default value -
@@ -335,4 +337,36 @@ public class InputData {
return dataFragment;
}
+ /**
+ * Returns aggregate type, i.e - count, min, max, etc
+ * @return aggregate type
+ */
+ public EnumAggregationType getAggType() {
+ return aggType;
+ }
+
+ /**
+ * Sets aggregate type, one of @see EnumAggregationType value
+ * @param aggType aggregate type
+ */
+ public void setAggType(EnumAggregationType aggType) {
+ this.aggType = aggType;
+ }
+
+ /**
+ * Returns index of a fragment in a file
+ * @return index of a fragment
+ */
+ public int getFragmentIndex() {
+ return fragmentIndex;
+ }
+
+ /**
+ * Sets index of a fragment in a file
+ * @param fragmentIndex index of a fragment
+ */
+ public void setFragmentIndex(int fragmentIndex) {
+ this.fragmentIndex = fragmentIndex;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index 51326bc..29d9c52 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -20,9 +20,15 @@ package org.apache.hawq.pxf.api.utilities;
*/
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -151,4 +157,78 @@ public class Utilities {
}
return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
}
+
+ /**
+ * Parses input data and returns fragment metadata.
+ *
+ * @param inputData input data which has protocol information
+ * @return fragment metadata
+ * @throws IllegalArgumentException if fragment metadata information wasn't found in input data
+ * @throws Exception
+ */
+ public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception {
+ byte[] serializedLocation = inputData.getFragmentMetadata();
+ if (serializedLocation == null) {
+ throw new IllegalArgumentException("Missing fragment location information");
+ }
+ try (ObjectInputStream objectStream = new ObjectInputStream(new ByteArrayInputStream(serializedLocation))) {
+ long start = objectStream.readLong();
+ long end = objectStream.readLong();
+ String[] hosts = (String[]) objectStream.readObject();
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("parsed file split: path ");
+ sb.append(inputData.getDataSource());
+ sb.append(", start ");
+ sb.append(start);
+ sb.append(", end ");
+ sb.append(end);
+ sb.append(", hosts ");
+ sb.append(ArrayUtils.toString(hosts));
+ LOG.debug(sb.toString());
+ }
+ FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts);
+ return fragmentMetadata;
+ } catch (Exception e) {
+ LOG.error("Unable to parse fragment metadata");
+ throw e;
+ }
+ }
+
+ /**
+ * Based on accessor information determines whether to use AggBridge
+ *
+ * @param protData
+ * @return true if AggBridge is applicable for current context
+ */
+ public static boolean useAggBridge(InputData inputData) {
+ boolean isStatsAccessor = false;
+ try {
+ isStatsAccessor = ArrayUtils.contains(Class.forName(inputData.getAccessor()).getInterfaces(), StatsAccessor.class);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Unable to load accessor class: " + e.getMessage());
+ return false;
+ }
+ return (inputData != null) && (inputData.getAggType() != null)
+ && inputData.getAggType().isOptimizationSupported()
+ && isStatsAccessor;
+ }
+
+ /**
+ * Determines whether accessor should use statistics to optimize reading results
+ *
+ * @param accessor accessor instance
+ * @param inputData input data which has protocol information
+ * @return true if this accessor should use statistic information
+ */
+ public static boolean useStats(ReadAccessor accessor, InputData inputData) {
+ if (accessor instanceof StatsAccessor) {
+ if (inputData != null && !inputData.hasFilter()
+ && inputData.getAggType() != null
+ && inputData.getAggType().isOptimizationSupported()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
index 355ea42..6fe896a 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
@@ -22,10 +22,19 @@ package org.apache.hawq.pxf.api.utilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.junit.Test;
@@ -37,6 +46,49 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class})
public class UtilitiesTest {
+ class StatsAccessorImpl implements StatsAccessor {
+
+ @Override
+ public boolean openForRead() throws Exception {
+ return false;
+ }
+
+ @Override
+ public OneRow readNextObject() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void closeForRead() throws Exception {
+ }
+
+ @Override
+ public void retrieveStats() throws Exception {
+ }
+
+ @Override
+ public OneRow emitAggObject() {
+ return null;
+ }
+ }
+
+ class NonStatsAccessorImpl implements ReadAccessor {
+
+ @Override
+ public boolean openForRead() throws Exception {
+ return false;
+ }
+
+ @Override
+ public OneRow readNextObject() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void closeForRead() throws Exception {
+ }
+ }
+
@Test
public void byteArrayToOctalStringNull() throws Exception {
StringBuilder sb = null;
@@ -114,4 +166,43 @@ public class UtilitiesTest {
result = Utilities.maskNonPrintables(input);
assertEquals("http://www.beatles.com/info.query.whoisthebest", result);
}
+
+ @Test
+ public void parseFragmentMetadata() throws Exception {
+ InputData metaData = mock(InputData.class);
+ ByteArrayOutputStream bas = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bas);
+ os.writeLong(10);
+ os.writeLong(100);
+ os.writeObject(new String[] { "hostname" });
+ os.close();
+ when(metaData.getFragmentMetadata()).thenReturn(bas.toByteArray());
+ FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(metaData);
+
+ assertEquals(10, fragmentMetadata.getStart());
+ assertEquals(100, fragmentMetadata.getEnd());
+ assertEquals(new String[] { "hostname" }, fragmentMetadata.getHosts());
+ }
+
+ @Test
+ public void useAggBridge() {
+ InputData metaData = mock(InputData.class);
+ when(metaData.getAccessor()).thenReturn(StatsAccessorImpl.class.getName());
+ when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT);
+ assertTrue(Utilities.useAggBridge(metaData));
+
+ when(metaData.getAccessor()).thenReturn(UtilitiesTest.class.getName());
+ when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT);
+ assertFalse(Utilities.useAggBridge(metaData));
+ }
+
+ @Test
+ public void useStats() {
+ InputData metaData = mock(InputData.class);
+ ReadAccessor accessor = new StatsAccessorImpl();
+ when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT);
+ assertTrue(Utilities.useStats(accessor, metaData));
+ ReadAccessor nonStatusAccessor = new NonStatsAccessorImpl();
+ assertFalse(Utilities.useStats(nonStatusAccessor, metaData));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
index 178b774..a95248e 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -65,7 +65,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
// 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
conf = new Configuration();
- fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+ fileSplit = HdfsUtilities.parseFileSplit(inputData);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
index 0174bd8..b61d76a 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -75,7 +75,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements
@Override
public boolean openForRead() throws Exception {
LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
- FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+ FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
requestSplits.add(fileSplit);
// Initialize record reader based on current split
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
index c99ccd6..1aae838 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.avro.Schema;
@@ -30,7 +31,6 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -171,29 +171,11 @@ public class HdfsUtilities {
* @param inputData request input data
* @return FileSplit with fragment metadata
*/
- public static FileSplit parseFragmentMetadata(InputData inputData) {
+ public static FileSplit parseFileSplit(InputData inputData) {
try {
- byte[] serializedLocation = inputData.getFragmentMetadata();
- if (serializedLocation == null) {
- throw new IllegalArgumentException(
- "Missing fragment location information");
- }
-
- ByteArrayInputStream bytesStream = new ByteArrayInputStream(
- serializedLocation);
- ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-
- long start = objectStream.readLong();
- long end = objectStream.readLong();
-
- String[] hosts = (String[]) objectStream.readObject();
-
- FileSplit fileSplit = new FileSplit(new Path(
- inputData.getDataSource()), start, end, hosts);
+ FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
- LOG.debug("parsed file split: path " + inputData.getDataSource()
- + ", start " + start + ", end " + end + ", hosts "
- + ArrayUtils.toString(hosts));
+ FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()), fragmentMetadata.getStart(), fragmentMetadata.getEnd(), fragmentMetadata.getHosts());
return fileSplit;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
index 36ca846..9b1fc6d 100644
--- a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
@@ -21,10 +21,12 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities;
import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Before;
import org.junit.Test;
@@ -35,6 +37,8 @@ import org.powermock.core.classloader.annotations.SuppressStaticInitializationFo
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -198,4 +202,21 @@ public class HdfsUtilitiesTest {
assertEquals("", HdfsUtilities.toString(Collections.<OneField>emptyList(), "!"));
}
+
+ @Test
+ public void testParseFileSplit() throws Exception {
+ InputData inputData = mock(InputData.class);
+ when(inputData.getDataSource()).thenReturn("/abc/path/to/data/source");
+ ByteArrayOutputStream bas = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bas);
+ os.writeLong(10);
+ os.writeLong(100);
+ os.writeObject(new String[] { "hostname" });
+ os.close();
+ when(inputData.getFragmentMetadata()).thenReturn(bas.toByteArray());
+ FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
+ assertEquals(fileSplit.getStart(), 10);
+ assertEquals(fileSplit.getLength(), 100);
+ assertEquals(fileSplit.getPath().toString(), "/abc/path/to/data/source");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 07348b0..5f7c584 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -23,16 +23,24 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hawq.pxf.api.BasicFilter;
import org.apache.hawq.pxf.api.LogicalFilter;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapred.*;
+import java.io.IOException;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,7 +53,7 @@ import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_
* This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files.
* Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
*/
-public class HiveORCAccessor extends HiveAccessor {
+public class HiveORCAccessor extends HiveAccessor implements StatsAccessor {
private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class);
@@ -53,6 +61,14 @@ public class HiveORCAccessor extends HiveAccessor {
private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
private final String SARG_PUSHDOWN = "sarg.pushdown";
+ protected Reader orcReader;
+
+ private boolean useStats;
+ private long count;
+ private long objectsEmitted;
+ private OneRow rowToEmitCount;
+
+ private boolean statsInitialized;
/**
* Constructs a HiveORCFileAccessor.
@@ -65,12 +81,21 @@ public class HiveORCAccessor extends HiveAccessor {
HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
initPartitionFields(hiveUserData.getPartitionKeys());
filterInFragmenter = hiveUserData.isFilterInFragmenter();
+ useStats = Utilities.useStats(this, inputData);
}
@Override
public boolean openForRead() throws Exception {
- addColumns();
- addFilters();
+ if (useStats) {
+ orcReader = HiveUtilities.getOrcReader(inputData);
+ if (orcReader == null) {
+ return false;
+ }
+ objectsEmitted = 0;
+ } else {
+ addColumns();
+ addFilters();
+ }
return super.openForRead();
}
@@ -213,4 +238,50 @@ public class HiveORCAccessor extends HiveAccessor {
return true;
}
+ /**
+ * Fetches file-level statistics from an ORC file.
+ */
+ @Override
+ public void retrieveStats() throws Exception {
+ if (!this.useStats) {
+ throw new IllegalStateException("Accessor is not using statistics in current context.");
+ }
+ /*
+ * We are using file-level stats therefore if file has multiple splits,
+ * it's enough to return count for a first split in file.
+ * In case file has multiple splits - we don't want to duplicate counts.
+ */
+ if (inputData.getFragmentIndex() == 0) {
+ this.count = this.orcReader.getNumberOfRows();
+ rowToEmitCount = readNextObject();
+ }
+ statsInitialized = true;
+
+ }
+
+ /**
+ * Emits tuple without reading from disk, currently supports COUNT
+ */
+ @Override
+ public OneRow emitAggObject() {
+ if(!statsInitialized) {
+ throw new IllegalStateException("retrieveStats() should be called before calling emitAggObject()");
+ }
+ OneRow row = null;
+ if (inputData.getAggType() == null)
+ throw new UnsupportedOperationException("Aggregate opration is required");
+ switch (inputData.getAggType()) {
+ case COUNT:
+ if (objectsEmitted < count) {
+ objectsEmitted++;
+ row = rowToEmitCount;
+ }
+ break;
+ default: {
+ throw new UnsupportedOperationException("Aggregation operation is not supported.");
+ }
+ }
+ return row;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index 3328c9f..808c415 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -30,6 +30,8 @@ import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
@@ -39,6 +41,11 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.*;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hawq.pxf.api.Fragmenter;
import org.apache.hawq.pxf.api.Metadata;
import org.apache.hawq.pxf.api.Metadata.Field;
@@ -48,12 +55,9 @@ import org.apache.hawq.pxf.api.utilities.EnumHawqType;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.api.io.DataType;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter;
import org.apache.hawq.pxf.plugins.hive.HiveTablePartition;
-import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
import org.apache.hawq.pxf.plugins.hive.HiveUserData;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
@@ -627,4 +631,21 @@ public class HiveUtilities {
return deserializer;
}
+
+ /**
+ * Creates ORC file reader.
+ * @param inputData input data with given data source
+ * @return ORC file reader
+ */
+ public static Reader getOrcReader(InputData inputData) {
+ try {
+ Path path = new Path(inputData.getDataSource());
+ Reader reader = OrcFile.createReader(path.getFileSystem(new Configuration()), path);
+
+ return reader;
+
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while getting orc reader", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 8b4bf13..daee331 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -20,10 +20,14 @@ package org.apache.hawq.pxf.plugins.hive;
*/
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.mapred.*;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
@@ -39,6 +43,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.SARG_PUSHDOWN;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -55,6 +61,7 @@ public class HiveORCAccessorTest {
@Mock OrcInputFormat orcInputFormat;
@Mock InputFormat inputFormat;
@Mock ColumnDescriptor columnDesc;
+ @Mock Reader orcReader;
JobConf jobConf;
HiveORCAccessor accessor;
@@ -65,6 +72,7 @@ public class HiveORCAccessorTest {
PowerMockito.mockStatic(HiveUtilities.class);
PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "1", ""));
+ PowerMockito.when(HiveUtilities.getOrcReader(any(InputData.class))).thenReturn(orcReader);
PowerMockito.mockStatic(HdfsUtilities.class);
@@ -121,4 +129,9 @@ public class HiveORCAccessorTest {
assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN));
}
+ @Test(expected=IllegalStateException.class)
+ public void emitAggObjectCountStatsNotInitialized() {
+ accessor.emitAggObject();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
new file mode 100644
index 0000000..c03a6a2
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.collections.map.LRUMap;
+
+/**
+ * Bridge class optimized for aggregate queries.
+ *
+ */
+public class AggBridge extends ReadBridge implements Bridge {
+ private static final Log LOG = LogFactory.getLog(AggBridge.class);
+ /* Avoid resolving rows with the same key twice */
+ private LRUMap outputCache;
+
+ public AggBridge(ProtocolData protData) throws Exception {
+ super(protData);
+ }
+
+ @Override
+ public boolean beginIteration() throws Exception {
+ /* Initialize LRU cache with 100 items*/
+ outputCache = new LRUMap();
+ boolean openForReadStatus = super.fileAccessor.openForRead();
+ ((StatsAccessor) fileAccessor).retrieveStats();
+ return openForReadStatus;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Writable getNext() throws Exception {
+ Writable output = null;
+ LinkedList<Writable> cachedOutput = null;
+ OneRow onerow = null;
+
+ if (!outputQueue.isEmpty()) {
+ return outputQueue.pop();
+ }
+
+ try {
+ while (outputQueue.isEmpty()) {
+ onerow = ((StatsAccessor) fileAccessor).emitAggObject();
+ if (onerow == null) {
+ break;
+ }
+ cachedOutput = (LinkedList<Writable>) outputCache.get(onerow.getKey());
+ if (cachedOutput == null) {
+ cachedOutput = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+ outputCache.put(onerow.getKey(), cachedOutput);
+ }
+ outputQueue.addAll(cachedOutput);
+ if (!outputQueue.isEmpty()) {
+ output = outputQueue.pop();
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Error occurred when reading next object from aggregate bridge:" + ex.getMessage());
+ throw ex;
+ }
+
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 104f353..4294e09 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -39,7 +39,8 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.catalina.connector.ClientAbortException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.AggBridge;
import org.apache.hawq.pxf.service.Bridge;
import org.apache.hawq.pxf.service.ReadBridge;
import org.apache.hawq.pxf.service.ReadSamplingBridge;
@@ -98,6 +99,8 @@ public class BridgeResource extends RestResource {
float sampleRatio = protData.getStatsSampleRatio();
if (sampleRatio > 0) {
bridge = new ReadSamplingBridge(protData);
+ } else if (Utilities.useAggBridge(protData)) {
+ bridge = new AggBridge(protData);
} else {
bridge = new ReadBridge(protData);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index dc2a110..0cb6d47 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.ProfilesConf;
@@ -115,6 +116,18 @@ public class ProtocolData extends InputData {
// Store alignment for global use as a system property
System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
+
+ //Get aggregation operation
+ String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
+
+ this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));
+
+ //Get fragment index
+ String fragmentIndexStr = getOptionalProperty("FRAGMENT-INDEX");
+
+ if (fragmentIndexStr != null) {
+ this.setFragmentIndex(Integer.parseInt(fragmentIndexStr));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index f176586..a853f06 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -220,6 +220,7 @@ void set_current_fragment_headers(gphadoop_context* context)
churl_headers_override(context->churl_headers, "X-GP-DATA-DIR", frag_data->source_name);
churl_headers_override(context->churl_headers, "X-GP-DATA-FRAGMENT", frag_data->index);
churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-METADATA", frag_data->fragment_md);
+ churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-INDEX", frag_data->index);
if (frag_data->user_data)
{