You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by nh...@apache.org on 2015/11/20 22:50:40 UTC

[2/3] incubator-hawq git commit: HAWQ-44. Advanced statistics for PXF tables.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 06ff72b..a734ade 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -33,6 +33,9 @@ public class ProtocolData extends InputData {
     protected String host;
     protected String profile;
     protected String token;
+    // statistics parameters
+    protected int statsMaxFragments;
+    protected float statsSampleRatio;
 
     /**
      * Constructs a ProtocolData. Parses X-GP-* configuration variables.
@@ -88,6 +91,10 @@ public class ProtocolData extends InputData {
         dataFragment = INVALID_SPLIT_IDX;
         parseDataFragment(getOptionalProperty("DATA-FRAGMENT"));
 
+        statsMaxFragments = 0;
+        statsSampleRatio = 0;
+        parseStatsParameters();
+
         // Store alignment for global use as a system property
         System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
     }
@@ -120,10 +127,8 @@ public class ProtocolData extends InputData {
         this.remoteLogin = copy.remoteLogin;
         this.remoteSecret = copy.remoteSecret;
         this.token = copy.token;
-    }
-
-    public String getToken() {
-        return token;
+        this.statsMaxFragments = copy.statsMaxFragments;
+        this.statsSampleRatio = copy.statsSampleRatio;
     }
 
     /**
@@ -264,6 +269,38 @@ public class ProtocolData extends InputData {
     }
 
     /**
+     * Returns Kerberos token information.
+     *
+     * @return token
+     */
+    public String getToken() {
+        return token;
+    }
+
+    /**
+     * Statistics parameter. Returns the max number of fragments to return for
+     * ANALYZE sampling. The value is set in HAWQ side using the GUC
+     * pxf_stats_max_fragments.
+     *
+     * @return max number of fragments to be processed by analyze
+     */
+    public int getStatsMaxFragments() {
+        return statsMaxFragments;
+    }
+
+    /**
+     * Statistics parameter. Returns a number between 0.0001 and 1.0,
+     * representing the sampling ratio on each fragment for ANALYZE sampling.
+     * The value is set in HAWQ side based on ANALYZE computations and the
+     * number of sampled fragments.
+     *
+     * @return sampling ratio
+     */
+    public float getStatsSampleRatio() {
+        return statsSampleRatio;
+    }
+
+    /**
      * Sets the thread safe parameter. Default value - true.
      */
     private void parseThreadSafe() {
@@ -371,4 +408,34 @@ public class ProtocolData extends InputData {
         remoteLogin = getOptionalProperty("REMOTE-USER");
         remoteSecret = getOptionalProperty("REMOTE-PASS");
     }
+
+    private void parseStatsParameters() {
+
+        String maxFrags = getOptionalProperty("STATS-MAX-FRAGMENTS");
+        if (!StringUtils.isEmpty(maxFrags)) {
+            statsMaxFragments = Integer.parseInt(maxFrags);
+            if (statsMaxFragments <= 0) {
+                throw new IllegalArgumentException("Wrong value '"
+                        + statsMaxFragments + "'. "
+                        + "STATS-MAX-FRAGMENTS must be a positive integer");
+            }
+        }
+
+        String sampleRatioStr = getUserProperty("STATS-SAMPLE-RATIO");
+        if (!StringUtils.isEmpty(sampleRatioStr)) {
+            statsSampleRatio = Float.parseFloat(sampleRatioStr);
+            if (statsSampleRatio < 0.0001 || statsSampleRatio > 1.0) {
+                throw new IllegalArgumentException(
+                        "Wrong value '"
+                                + statsSampleRatio
+                                + "'. "
+                                + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+            }
+        }
+
+        if ((statsSampleRatio > 0) != (statsMaxFragments > 0)) {
+            throw new IllegalArgumentException(
+                    "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
index 137622d..57fb0f4 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
@@ -2,66 +2,156 @@ package org.apache.hawq.pxf.service;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Test;
-
 import org.apache.hawq.pxf.api.BadRecordException;
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.BufferWritable;
 import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.junit.Test;
 
 public class BridgeOutputBuilderTest {
 
+    /**
+     * Test class to check the data inside BufferWritable.
+     */
+    private class DataOutputToBytes implements DataOutput {
+
+        byte[] output;
+
+        public byte[] getOutput() {
+            return output;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void write(byte[] b) throws IOException {
+            output = b;
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeBoolean(boolean v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeByte(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeShort(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeChar(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeInt(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeLong(long v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeFloat(float v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeDouble(double v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeBytes(String s) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeChars(String s) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeUTF(String s) throws IOException {
+            throw new IOException("not implemented");
+        }
+    }
+
     private static final int UN_SUPPORTED_TYPE = -1;
     private GPDBWritable output = null;
+    private DataOutputToBytes dos = new DataOutputToBytes();
 
     @Test
     public void testFillGPDBWritable() throws Exception {
         Map<String, String> parameters = new HashMap<String, String>();
         parameters.put("X-GP-ATTRS", "14");
 
-        addColumn(parameters, 0, DataType.INTEGER,   "col0");
-        addColumn(parameters, 1, DataType.FLOAT8,    "col1");
-        addColumn(parameters, 2, DataType.REAL,      "col2");
-        addColumn(parameters, 3, DataType.BIGINT,    "col3");
-        addColumn(parameters, 4, DataType.SMALLINT,  "col4");
-        addColumn(parameters, 5, DataType.BOOLEAN,   "col5");
-        addColumn(parameters, 6, DataType.BYTEA,     "col6");
-        addColumn(parameters, 7, DataType.VARCHAR,   "col7");
-        addColumn(parameters, 8, DataType.BPCHAR,    "col8");
-        addColumn(parameters, 9, DataType.CHAR,      "col9");
-        addColumn(parameters, 10, DataType.TEXT,     "col10");
-        addColumn(parameters, 11, DataType.NUMERIC,  "col11");
-        addColumn(parameters, 12, DataType.TIMESTAMP,"col12");
-        addColumn(parameters, 13, DataType.DATE,     "col13");
-
+        addColumn(parameters, 0, DataType.INTEGER, "col0");
+        addColumn(parameters, 1, DataType.FLOAT8, "col1");
+        addColumn(parameters, 2, DataType.REAL, "col2");
+        addColumn(parameters, 3, DataType.BIGINT, "col3");
+        addColumn(parameters, 4, DataType.SMALLINT, "col4");
+        addColumn(parameters, 5, DataType.BOOLEAN, "col5");
+        addColumn(parameters, 6, DataType.BYTEA, "col6");
+        addColumn(parameters, 7, DataType.VARCHAR, "col7");
+        addColumn(parameters, 8, DataType.BPCHAR, "col8");
+        addColumn(parameters, 9, DataType.CHAR, "col9");
+        addColumn(parameters, 10, DataType.TEXT, "col10");
+        addColumn(parameters, 11, DataType.NUMERIC, "col11");
+        addColumn(parameters, 12, DataType.TIMESTAMP, "col12");
+        addColumn(parameters, 13, DataType.DATE, "col13");
 
         BridgeOutputBuilder builder = makeBuilder(parameters);
         output = builder.makeGPDBWritableOutput();
 
-        List<OneField> recFields = Arrays.asList(new OneField(DataType.INTEGER.getOID(), 0),
-                new OneField(DataType.FLOAT8.getOID(), (double) 0),
-                new OneField(DataType.REAL.getOID(), (float) 0),
-                new OneField(DataType.BIGINT.getOID(), (long) 0),
-                new OneField(DataType.SMALLINT.getOID(), (short) 0),
-                new OneField(DataType.BOOLEAN.getOID(), true),
-                new OneField(DataType.BYTEA.getOID(), new byte[]{0}),
-                new OneField(DataType.VARCHAR.getOID(), "value"),
-                new OneField(DataType.BPCHAR.getOID(), "value"),
-                new OneField(DataType.CHAR.getOID(), "value"),
-                new OneField(DataType.TEXT.getOID(), "value"),
-                new OneField(DataType.NUMERIC.getOID(), "0"),
-                new OneField(DataType.TIMESTAMP.getOID(), new Timestamp(0)),
+        List<OneField> recFields = Arrays.asList(
+                new OneField(DataType.INTEGER.getOID(), 0), new OneField(
+                        DataType.FLOAT8.getOID(), (double) 0), new OneField(
+                        DataType.REAL.getOID(), (float) 0), new OneField(
+                        DataType.BIGINT.getOID(), (long) 0), new OneField(
+                        DataType.SMALLINT.getOID(), (short) 0), new OneField(
+                        DataType.BOOLEAN.getOID(), true), new OneField(
+                        DataType.BYTEA.getOID(), new byte[] { 0 }),
+                new OneField(DataType.VARCHAR.getOID(), "value"), new OneField(
+                        DataType.BPCHAR.getOID(), "value"), new OneField(
+                        DataType.CHAR.getOID(), "value"), new OneField(
+                        DataType.TEXT.getOID(), "value"), new OneField(
+                        DataType.NUMERIC.getOID(), "0"), new OneField(
+                        DataType.TIMESTAMP.getOID(), new Timestamp(0)),
                 new OneField(DataType.DATE.getOID(), new Date(1)));
         builder.fillGPDBWritable(recFields);
 
@@ -71,30 +161,33 @@ public class BridgeOutputBuilderTest {
         assertEquals(output.getLong(3), Long.valueOf(0));
         assertEquals(output.getShort(4), Short.valueOf((short) 0));
         assertEquals(output.getBoolean(5), true);
-        assertArrayEquals(output.getBytes(6), new byte[]{0});
+        assertArrayEquals(output.getBytes(6), new byte[] { 0 });
         assertEquals(output.getString(7), "value\0");
         assertEquals(output.getString(8), "value\0");
         assertEquals(output.getString(9), "value\0");
         assertEquals(output.getString(10), "value\0");
         assertEquals(output.getString(11), "0\0");
         assertEquals(Timestamp.valueOf(output.getString(12)), new Timestamp(0));
-        assertEquals(Date.valueOf(output.getString(13).trim()).toString(), new Date(1).toString());
+        assertEquals(Date.valueOf(output.getString(13).trim()).toString(),
+                new Date(1).toString());
     }
 
     @Test
     public void testFillOneGPDBWritableField() throws Exception {
         Map<String, String> parameters = new HashMap<String, String>();
-        parameters.put("X-GP-ATTRS", "1");    	
-        addColumn(parameters, 0, DataType.INTEGER, "col0");    	
+        parameters.put("X-GP-ATTRS", "1");
+        addColumn(parameters, 0, DataType.INTEGER, "col0");
         BridgeOutputBuilder builder = makeBuilder(parameters);
         output = builder.makeGPDBWritableOutput();
 
-        OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte((byte) 0));
+        OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte(
+                (byte) 0));
         try {
             builder.fillOneGPDBWritableField(unSupportedField, 0);
             fail("Unsupported data type should throw exception");
         } catch (UnsupportedOperationException e) {
-            assertEquals(e.getMessage(), "Byte is not supported for HAWQ conversion");
+            assertEquals(e.getMessage(),
+                    "Byte is not supported for HAWQ conversion");
         }
     }
 
@@ -113,10 +206,10 @@ public class BridgeOutputBuilderTest {
 
         /* all four fields */
         List<OneField> complete = Arrays.asList(
-                new OneField(DataType.INTEGER.getOID(), 10),
-                new OneField(DataType.INTEGER.getOID(), 20),
-                new OneField(DataType.INTEGER.getOID(), 30),
-                new OneField(DataType.INTEGER.getOID(), 40));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20), new OneField(
+                        DataType.INTEGER.getOID(), 30), new OneField(
+                        DataType.INTEGER.getOID(), 40));
         builder.fillGPDBWritable(complete);
         assertEquals(output.getColType().length, 4);
         assertEquals(output.getInt(0), Integer.valueOf(10));
@@ -126,13 +219,14 @@ public class BridgeOutputBuilderTest {
 
         /* two fields instead of four */
         List<OneField> incomplete = Arrays.asList(
-        		new OneField(DataType.INTEGER.getOID(), 10),
-    			new OneField(DataType.INTEGER.getOID(), 20));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20));
         try {
             builder.fillGPDBWritable(incomplete);
             fail("testRecordBiggerThanSchema should have failed on - Record has 2 fields but the schema size is 4");
         } catch (BadRecordException e) {
-            assertEquals(e.getMessage(), "Record has 2 fields but the schema size is 4");
+            assertEquals(e.getMessage(),
+                    "Record has 2 fields but the schema size is 4");
         }
     }
 
@@ -151,16 +245,17 @@ public class BridgeOutputBuilderTest {
 
         /* five fields instead of four */
         List<OneField> complete = Arrays.asList(
-                new OneField(DataType.INTEGER.getOID(), 10),
-                new OneField(DataType.INTEGER.getOID(), 20),
-                new OneField(DataType.INTEGER.getOID(), 30),
-                new OneField(DataType.INTEGER.getOID(), 40),
-                new OneField(DataType.INTEGER.getOID(), 50));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20), new OneField(
+                        DataType.INTEGER.getOID(), 30), new OneField(
+                        DataType.INTEGER.getOID(), 40), new OneField(
+                        DataType.INTEGER.getOID(), 50));
         try {
             builder.fillGPDBWritable(complete);
             fail("testRecordBiggerThanSchema should have failed on - Record has 5 fields but the schema size is 4");
         } catch (BadRecordException e) {
-            assertEquals(e.getMessage(), "Record has 5 fields but the schema size is 4");
+            assertEquals(e.getMessage(),
+                    "Record has 5 fields but the schema size is 4");
         }
     }
 
@@ -179,25 +274,171 @@ public class BridgeOutputBuilderTest {
 
         /* last field is REAL while schema requires INT */
         List<OneField> complete = Arrays.asList(
-                new OneField(DataType.INTEGER.getOID(), 10),
-                new OneField(DataType.INTEGER.getOID(), 20),
-                new OneField(DataType.INTEGER.getOID(), 30),
-                new OneField(DataType.REAL.getOID(), 40.0));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20), new OneField(
+                        DataType.INTEGER.getOID(), 30), new OneField(
+                        DataType.REAL.getOID(), 40.0));
         try {
             builder.fillGPDBWritable(complete);
             fail("testFieldTypeMismatch should have failed on - For field 3 schema requires type INTEGER but input record has type REAL");
         } catch (BadRecordException e) {
-            assertEquals(e.getMessage(), "For field col3 schema requires type INTEGER but input record has type REAL");
+            assertEquals(e.getMessage(),
+                    "For field col3 schema requires type INTEGER but input record has type REAL");
+        }
+    }
+
+    @Test
+    public void convertTextDataToLines() throws Exception {
+
+        String data = "Que sara sara\n" + "Whatever will be will be\n"
+                + "We are going\n" + "to Wembeley!\n";
+        byte[] dataBytes = data.getBytes();
+        String[] dataLines = new String[] {
+                "Que sara sara\n",
+                "Whatever will be will be\n",
+                "We are going\n",
+                "to Wembeley!\n" };
+
+        OneField field = new OneField(DataType.BYTEA.getOID(), dataBytes);
+        List<OneField> fields = new ArrayList<OneField>();
+        fields.add(field);
+
+        Map<String, String> parameters = new HashMap<String, String>();
+        parameters.put("X-GP-ATTRS", "1");
+        addColumn(parameters, 0, DataType.TEXT, "col0");
+        // activate sampling code
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "100");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "1.00");
+
+        BridgeOutputBuilder builder = makeBuilder(parameters);
+        LinkedList<Writable> outputQueue = builder.makeOutput(fields);
+
+        assertEquals(4, outputQueue.size());
+
+        for (int i = 0; i < dataLines.length; ++i) {
+            Writable line = outputQueue.get(i);
+            compareBufferWritable(line, dataLines[i]);
         }
-    }   
 
-    private void addColumn(Map<String, String> parameters, int idx, DataType dataType, String name) {
+        assertNull(builder.getPartialLine());
+    }
+
+    @Test
+    public void convertTextDataToLinesPartial() throws Exception {
+        String data = "oh well\n" + "what the hell";
+
+        OneField field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        List<OneField> fields = new ArrayList<OneField>();
+        fields.add(field);
+
+        Map<String, String> parameters = new HashMap<String, String>();
+        parameters.put("X-GP-ATTRS", "1");
+        addColumn(parameters, 0, DataType.TEXT, "col0");
+        // activate sampling code
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "100");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "1.00");
+
+        BridgeOutputBuilder builder = makeBuilder(parameters);
+        LinkedList<Writable> outputQueue = builder.makeOutput(fields);
+
+        assertEquals(1, outputQueue.size());
+
+        Writable line = outputQueue.get(0);
+        compareBufferWritable(line, "oh well\n");
+
+        Writable partial = builder.getPartialLine();
+        assertNotNull(partial);
+        compareBufferWritable(partial, "what the hell");
+
+        // check that append works
+        data = " but the show must go on\n" + "!!!\n";
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertNull(builder.getPartialLine());
+        assertEquals(2, outputQueue.size());
+
+        line = outputQueue.get(0);
+        compareBufferWritable(line, "what the hell but the show must go on\n");
+        line = outputQueue.get(1);
+        compareBufferWritable(line, "!!!\n");
+
+        // check that several partial lines gets appended to each other
+        data = "I want to ride my bicycle\n" + "I want to ride my bike";
+
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertEquals(1, outputQueue.size());
+
+        line = outputQueue.get(0);
+        compareBufferWritable(line, "I want to ride my bicycle\n");
+
+        partial = builder.getPartialLine();
+        assertNotNull(partial);
+        compareBufferWritable(partial, "I want to ride my bike");
+
+        // data consisting of one long line
+        data = " I want to ride my bicycle";
+
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertEquals(0, outputQueue.size());
+
+        partial = builder.getPartialLine();
+        assertNotNull(partial);
+        compareBufferWritable(partial,
+                "I want to ride my bike I want to ride my bicycle");
+
+        // data with lines
+        data = " bicycle BICYCLE\n" + "bicycle BICYCLE\n";
+
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertEquals(2, outputQueue.size());
+
+        line = outputQueue.get(0);
+        compareBufferWritable(line,
+                "I want to ride my bike I want to ride my bicycle bicycle BICYCLE\n");
+        line = outputQueue.get(1);
+        compareBufferWritable(line, "bicycle BICYCLE\n");
+
+        partial = builder.getPartialLine();
+        assertNull(partial);
+
+    }
+
+    private void compareBufferWritable(Writable line, String expected)
+            throws IOException {
+        assertTrue(line instanceof BufferWritable);
+        line.write(dos);
+        assertArrayEquals(expected.getBytes(), dos.getOutput());
+    }
+
+    private void addColumn(Map<String, String> parameters, int idx,
+                           DataType dataType, String name) {
         parameters.put("X-GP-ATTR-NAME" + idx, name);
-        parameters.put("X-GP-ATTR-TYPECODE" + idx, Integer.toString(dataType.getOID()));
+        parameters.put("X-GP-ATTR-TYPECODE" + idx,
+                Integer.toString(dataType.getOID()));
         parameters.put("X-GP-ATTR-TYPENAME" + idx, dataType.toString());
     }
 
-    private BridgeOutputBuilder makeBuilder(Map<String, String> parameters) throws Exception {
+    private BridgeOutputBuilder makeBuilder(Map<String, String> parameters)
+            throws Exception {
 
         parameters.put("X-GP-ALIGNMENT", "8");
         parameters.put("X-GP-SEGMENT-ID", "-44");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
new file mode 100644
index 0000000..b5d4f9a
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
@@ -0,0 +1,225 @@
+package org.apache.hawq.pxf.service;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.ReadSamplingBridge;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ AnalyzeUtils.class, ReadSamplingBridge.class })
+public class ReadSamplingBridgeTest {
+
+    /**
+     * Writable test object to test ReadSamplingBridge. The object receives a
+     * string and returns it in its toString function.
+     */
+    public class WritableTest implements Writable {
+
+        private String data;
+
+        public WritableTest(String data) {
+            this.data = data;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public String toString() {
+            return data;
+        }
+
+    }
+
+    private ProtocolData mockProtData;
+    private ReadBridge mockBridge;
+    private ReadSamplingBridge readSamplingBridge;
+    private int recordsLimit = 0;
+    private BitSet samplingBitSet;
+    private Writable result;
+
+    @Test
+    public void getNextRecord100Percent() throws Exception {
+
+        samplingBitSet.set(0, 100);
+        recordsLimit = 100;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 1.0);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        result = readSamplingBridge.getNext();
+        assertEquals("0", result.toString());
+
+        result = readSamplingBridge.getNext();
+        assertEquals("1", result.toString());
+
+        when(mockBridge.getNext()).thenReturn(null);
+
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord100Records10Percent() throws Exception {
+
+        // set 10 bits from 5 to 14.
+        samplingBitSet.set(5, 15);
+        recordsLimit = 100;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.1);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        for (int i = 0; i < 10; i++) {
+            result = readSamplingBridge.getNext();
+            assertEquals("" + (i + 5), result.toString());
+        }
+
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord100Records90Percent() throws Exception {
+        int expected = 0;
+
+        // set the first odd numbers until 20, then all numbers until 100
+        // total: 90.
+        samplingBitSet.set(0, 100);
+        for (int i = 0; i < 10; i++) {
+            samplingBitSet.flip(i * 2);
+        }
+        recordsLimit = 100;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.9);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        for (int i = 0; i < 90; i++) {
+            result = readSamplingBridge.getNext();
+            if (i < 10) {
+                expected = (i * 2) + 1;
+            } else {
+                expected = i + 10;
+            }
+            assertEquals("" + expected, result.toString());
+        }
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord350Records50Percent() throws Exception {
+
+        // set bits 0, 40-79 (40) and 90-98 (9)
+        // total 50.
+        samplingBitSet.set(0);
+        samplingBitSet.set(40, 80);
+        samplingBitSet.set(90, 99);
+        recordsLimit = 350;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.5);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        /*
+         * expecting to have: 50 (out of first 100) 50 (out of second 100) 50
+         * (out of third 100) 11 (out of last 50) --- 161 records
+         */
+        for (int i = 0; i < 161; i++) {
+            result = readSamplingBridge.getNext();
+            assertNotNull(result);
+            if (i % 50 == 0) {
+                assertEquals("" + (i * 2), result.toString());
+            }
+        }
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord100000Records30Sample() throws Exception {
+        int expected = 0;
+
+        // ratio = 0.0003
+        float ratio = (float) (30.0 / 100000.0);
+
+        // set 3 records in every 10000.
+        samplingBitSet.set(99);
+        samplingBitSet.set(999);
+        samplingBitSet.set(9999);
+        recordsLimit = 100000;
+        when(mockProtData.getStatsSampleRatio()).thenReturn(ratio);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        for (int i = 0; i < 30; i++) {
+            result = readSamplingBridge.getNext();
+            assertNotNull(result);
+            int residue = i % 3;
+            int div = i / 3;
+            if (residue == 0) {
+                expected = 99 + (div * 10000);
+            } else if (residue == 1) {
+                expected = 999 + (div * 10000);
+            } else {
+                expected = 9999 + (div * 10000);
+            }
+            assertEquals("" + expected, result.toString());
+        }
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        mockProtData = mock(ProtocolData.class);
+
+        mockBridge = mock(ReadBridge.class);
+        PowerMockito.whenNew(ReadBridge.class).withAnyArguments().thenReturn(
+                mockBridge);
+
+        when(mockBridge.getNext()).thenAnswer(new Answer<Writable>() {
+            private int count = 0;
+
+            @Override
+            public Writable answer(InvocationOnMock invocation)
+                    throws Throwable {
+                if (count >= recordsLimit) {
+                    return null;
+                }
+                return new WritableTest("" + (count++));
+            }
+        });
+
+        PowerMockito.mockStatic(AnalyzeUtils.class);
+        samplingBitSet = new BitSet();
+        when(
+                AnalyzeUtils.generateSamplingBitSet(any(int.class),
+                        any(int.class))).thenReturn(samplingBitSet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
new file mode 100644
index 0000000..984fcf6
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
@@ -0,0 +1,22 @@
+package org.apache.hawq.pxf.service.io;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class BufferWritableTest {
+
+    @Test
+    public void append() throws Exception {
+        String data1 = "פרק ראשון ובו יסופר יסופר";
+        String data2 = "פרק שני ובו יסופר יסופר";
+
+        BufferWritable bw1 = new BufferWritable(data1.getBytes());
+
+        assertArrayEquals(data1.getBytes(), bw1.buf);
+
+        bw1.append(data2.getBytes());
+
+        assertArrayEquals((data1+data2).getBytes(), bw1.buf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
new file mode 100644
index 0000000..39bb266
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
@@ -0,0 +1,117 @@
+package org.apache.hawq.pxf.service.utilities;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+public class AnalyzeUtilsTest {
+
+    @Test
+    public void generateSamplingBitSet() throws Exception {
+        runGenerateSamplingBitSetTest(10, 5, new int[]{0, 3, 4, 6, 9});
+
+        runGenerateSamplingBitSetTest(9, 8, new int[] {0, 2, 3, 4, 5, 6, 7, 8});
+
+        runGenerateSamplingBitSetTest(10, 10, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        runGenerateSamplingBitSetTest(8, 0, new int[]{});
+
+        runGenerateSamplingBitSetTest(8, 3, new int[]{0, 3, 6});
+    }
+
+    @Test
+    public void generateSamplingBitSetBig() throws Exception {
+        BitSet result = AnalyzeUtils.generateSamplingBitSet(1000000, 990000);
+        assertEquals(result.cardinality(), 990000);
+        assertTrue(result.length() < 1000000);
+
+        result = AnalyzeUtils.generateSamplingBitSet(1000000000, 5000000);
+        assertEquals(result.cardinality(), 5000000);
+        assertTrue(result.length() < 1000000000);
+    }
+
+    @Test
+    public void getSampleFragments() throws Exception {
+        // fragments less than threshold
+        runGetSampleFragmentsTest(4, 100, 4, new int[] {0, 1, 2, 3});
+
+        // fragments over threshold
+        runGetSampleFragmentsTest(4, 2, 2, new int[]{0, 3});
+        runGetSampleFragmentsTest(10, 2, 2, new int[]{0, 6});
+        runGetSampleFragmentsTest(10, 3, 3, new int[]{0, 4, 8});
+        runGetSampleFragmentsTest(10, 9, 9, new int[]{0, 1, 2, 4, 5, 6, 7, 8, 9 });
+        runGetSampleFragmentsTest(15, 10, 10, new int[]{0, 2, 3, 4, 6, 7, 8, 10, 12, 14});
+        runGetSampleFragmentsTest(1000, 10, 10,
+                new int[]{0, 101, 202, 303, 404, 505, 606, 707, 808, 909});
+        runGetSampleFragmentsTest(100, 65, 65,
+                new int[]{0, 1, 2, 4, 5, 6, 8, 9, 10,       /* 9 elements */
+                          12, 13, 14, 16, 17, 18,           /* 6 elements */
+                          20, 21, 22, 24, 25, 26, 28, 29,   /* 8 elements */
+                          30, 32, 33, 34, 36, 37, 38,       /* 7 elements */
+                          40, 41, 42, 44, 45, 46, 48, 49,   /* 8 elements */
+                          50, 52, 53, 54, 56, 57, 58,       /* 7 elements */
+                          60, 62, 64, 66, 68,               /* 5 elements */
+                          70, 72, 74, 76, 78,               /* 5 elements */
+                          80, 82, 84, 86, 88,               /* 5 elements */
+                          90, 92, 94, 96, 98                /* 5 elements */
+                          });
+                                                            /* => 65 elements */
+        // threshold illegal and ignored
+        runGetSampleFragmentsTest(10, 0, 10, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+    }
+
+    private void runGenerateSamplingBitSetTest(int poolSize, int sampleSize, int[] expectedIndexes) throws Exception {
+        BitSet expected = new BitSet();
+        for (int i: expectedIndexes) {
+            expected.set(i);
+        }
+        BitSet result = AnalyzeUtils.generateSamplingBitSet(poolSize, sampleSize);
+
+        Assert.assertEquals(expected, result);
+    }
+
+    private void runGetSampleFragmentsTest(int inputSize, int maxFragments, int expectedSize, int[] expectedIndexes) throws Exception {
+        ProtocolData mockProtocolData = mock(ProtocolData.class);
+        when(mockProtocolData.getStatsMaxFragments()).thenReturn(maxFragments);
+
+        List<Fragment> fragments = new ArrayList<Fragment>();
+
+        for (int i = 0; i < inputSize; i++) {
+            fragments.add(prepareFragment(i));
+        }
+        assertEquals(inputSize, fragments.size());
+
+        List<Fragment> result = AnalyzeUtils.getSampleFragments(fragments, mockProtocolData);
+
+        List<Fragment> expected = new ArrayList<Fragment>();
+
+        for (int i: expectedIndexes) {
+            expected.add(prepareFragment(i));
+        }
+
+        assertEquals("verify number of returned fragments", expectedSize, result.size());
+
+        for (int i = 0; i < expectedSize; i++) {
+            Assert.assertEquals("compare fragment #" + i, expected.get(i).getIndex(), result.get(i).getIndex());
+        }
+    }
+
+    private Fragment prepareFragment(int i) {
+        Fragment fragment = new Fragment("fragment" + i, null, null);
+        fragment.setIndex(i);
+        return fragment;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
index 194f148..bd31b19 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({UserGroupInformation.class, ProfilesConf.class})
+@PrepareForTest({ UserGroupInformation.class, ProfilesConf.class })
 public class ProtocolDataTest {
     Map<String, String> parameters;
 
@@ -47,7 +47,8 @@ public class ProtocolDataTest {
         assertEquals(protocolData.getAccessor(), "are");
         assertEquals(protocolData.getResolver(), "packed");
         assertEquals(protocolData.getDataSource(), "i'm/ready/to/go");
-        assertEquals(protocolData.getUserProperty("i'm-standing-here"), "outside-your-door");
+        assertEquals(protocolData.getUserProperty("i'm-standing-here"),
+                "outside-your-door");
         assertEquals(protocolData.getParametersMap(), parameters);
         assertNull(protocolData.getLogin());
         assertNull(protocolData.getSecret());
@@ -66,10 +67,12 @@ public class ProtocolDataTest {
 
         Map<String, String> mockedProfiles = new HashMap<>();
         mockedProfiles.put("wHEn you trY yOUR bESt", "but you dont succeed");
-        mockedProfiles.put("when YOU get WHAT you WANT", "but not what you need");
+        mockedProfiles.put("when YOU get WHAT you WANT",
+                "but not what you need");
         mockedProfiles.put("when you feel so tired", "but you cant sleep");
 
-        when(ProfilesConf.getProfilePluginsMap("a profile")).thenReturn(mockedProfiles);
+        when(ProfilesConf.getProfilePluginsMap("a profile")).thenReturn(
+                mockedProfiles);
 
         parameters.put("x-gp-profile", "a profile");
         parameters.put("when you try your best", "and you do succeed");
@@ -79,7 +82,8 @@ public class ProtocolDataTest {
             new ProtocolData(parameters);
             fail("Duplicate property should throw IllegalArgumentException");
         } catch (IllegalArgumentException iae) {
-            assertEquals("Profile 'a profile' already defines: [when YOU get WHAT you WANT, wHEn you trY yOUR bESt]",
+            assertEquals(
+                    "Profile 'a profile' already defines: [when YOU get WHAT you WANT, wHEn you trY yOUR bESt]",
                     iae.getMessage());
         }
     }
@@ -169,8 +173,9 @@ public class ProtocolDataTest {
             new ProtocolData(parameters);
             fail("should fail with bad fragment metadata");
         } catch (Exception e) {
-            assertEquals(e.getMessage(), "Fragment metadata information must be Base64 encoded." +
-                    "(Bad value: " + badValue + ")");
+            assertEquals(e.getMessage(),
+                    "Fragment metadata information must be Base64 encoded."
+                            + "(Bad value: " + badValue + ")");
         }
     }
 
@@ -197,6 +202,116 @@ public class ProtocolDataTest {
         assertEquals("UTF8_計算機用語_00000000", protocolData.getFilterString());
     }
 
+    @Test
+    public void noStatsParams() {
+        ProtocolData protData = new ProtocolData(parameters);
+
+        assertEquals(0, protData.getStatsMaxFragments());
+        assertEquals(0, protData.getStatsSampleRatio(), 0.1);
+    }
+
+    @Test
+    public void statsParams() {
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "10101");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "0.039");
+
+        ProtocolData protData = new ProtocolData(parameters);
+
+        assertEquals(10101, protData.getStatsMaxFragments());
+        assertEquals(0.039, protData.getStatsSampleRatio(), 0.01);
+    }
+
+    @Test
+    public void statsMissingParams() {
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "13");
+        try {
+            new ProtocolData(parameters);
+            fail("missing X-GP-STATS-SAMPLE-RATIO parameter");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+        }
+
+        parameters.remove("X-GP-STATS-MAX-FRAGMENTS");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "1");
+        try {
+            new ProtocolData(parameters);
+            fail("missing X-GP-STATS-MAX-FRAGMENTS parameter");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+        }
+    }
+
+    @Test
+    public void statsSampleRatioNegative() {
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "101");
+
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Wrong value '101.0'. "
+                            + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+        }
+
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "0");
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Wrong value '0.0'. "
+                            + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+        }
+
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "0.00005");
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Wrong value '5.0E-5'. "
+                            + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+        }
+
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "a");
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (NumberFormatException e) {
+            assertEquals(e.getMessage(), "For input string: \"a\"");
+        }
+    }
+
+    @Test
+    public void statsMaxFragmentsNegative() {
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "10.101");
+
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-MAX-FRAGMENTS value");
+        } catch (NumberFormatException e) {
+            assertEquals(e.getMessage(), "For input string: \"10.101\"");
+        }
+
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "0");
+
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-MAX-FRAGMENTS value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(e.getMessage(), "Wrong value '0'. "
+                    + "STATS-MAX-FRAGMENTS must be a positive integer");
+        }
+    }
+
     /*
      * setUp function called before each test
      */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/Makefile b/src/backend/access/external/Makefile
index afc3734..fec5a34 100644
--- a/src/backend/access/external/Makefile
+++ b/src/backend/access/external/Makefile
@@ -10,7 +10,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = fileam.o url.o libchurl.o hd_work_mgr.o pxfuriparser.o pxfheaders.o \
-pxfmasterapi.o ha_config.o pxfcomutils.o pxfutils.o pxffilters.o
+pxfmasterapi.o ha_config.o pxfcomutils.o pxfutils.o pxffilters.o pxfanalyze.o
 
 include $(top_srcdir)/src/backend/common.mk
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index 36f11a8..6aa0736 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -233,51 +233,41 @@ static void assign_pxf_port_to_fragments(int remote_rest_port, List *fragments)
 }
 
 /*
- * Fetches statistics of the PXF datasource from the PXF service
+ * Fetches fragments statistics of the PXF datasource from the PXF service
  *
- * The function will generate a delegation token when secure filesystem mode 
+ * The function will generate a delegation token when secure filesystem mode
  * is on and cancel it right after.
  */
-PxfStatsElem *get_pxf_statistics(char *uri, Relation rel, StringInfo err_msg)
+PxfFragmentStatsElem *get_pxf_fragments_statistics(char *uri, Relation rel)
 {
 	ClientContext client_context; /* holds the communication info */
 	char *analyzer = NULL;
 	char *profile = NULL;
 	PxfInputData inputData = {0};
-	PxfStatsElem *result = NULL;
-	
+	PxfFragmentStatsElem *result = NULL;
+
 	GPHDUri* hadoop_uri = init(uri, &client_context);
 	if (!hadoop_uri)
-		return NULL;
-
-	/*
-	 * Get the statistics info from REST only if analyzer is defined
-     */
-	if(GPHDUri_get_value_for_opt(hadoop_uri, "analyzer", &analyzer, false) != 0 &&
-	   GPHDUri_get_value_for_opt(hadoop_uri, "profile", &profile, false) != 0)
 	{
-		if (err_msg)
-			appendStringInfo(err_msg, "no ANALYZER or PROFILE option in table definition");
-		return NULL;
+		elog(ERROR, "Failed to parse PXF location %s", uri);
 	}
-	
+
 	/*
 	 * Enrich the curl HTTP header
 	 */
 	inputData.headers = client_context.http_headers;
 	inputData.gphduri = hadoop_uri;
-	inputData.rel = rel; 
-	inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */	
+	inputData.rel = rel;
+	inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */
     generate_delegation_token(&inputData);
 	build_http_header(&inputData);
-	
-	result = get_data_statistics(hadoop_uri, &client_context, err_msg);
+
+	result = get_fragments_statistics(hadoop_uri, &client_context);
 
 	cancel_delegation_token(&inputData);
 	return result;
 }
 
-
 /*
  * Preliminary uri parsing and curl initializations for the REST communication
  */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/pxfanalyze.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfanalyze.c b/src/backend/access/external/pxfanalyze.c
new file mode 100644
index 0000000..eb5a17a
--- /dev/null
+++ b/src/backend/access/external/pxfanalyze.c
@@ -0,0 +1,740 @@
+/*-------------------------------------------------------------------------
+ *
+ * pxfanalyze.c
+ *	  Helper functions to perform ANALYZE on PXF tables.
+ *
+ * Copyright (c) 2007-2012, Greenplum inc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include <curl/curl.h>
+#include <json/json.h>
+#include "access/hd_work_mgr.h"
+#include "access/pxfanalyze.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_exttable.h"
+#include "cdb/cdbanalyze.h"
+#include "commands/analyzeutils.h"
+#include "lib/stringinfo.h"
+#include "nodes/makefuncs.h"
+#include "utils/builtins.h"
+#include "utils/elog.h"
+#include "utils/guc.h"
+#include "utils/lsyscache.h"
+
+
+static void buildPxfTableCopy(Oid relationOid,
+		float4	samplingRatio,
+		int pxfStatMaxFragments,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable);
+static void buildSampleFromPxf(const char* sampleSchemaName,
+		const char* sampleTableName,
+		const char* pxfSampleTable,
+		List *lAttributeNames,
+		float4 *sampleTableRelTuples);
+
+static float4 calculateSamplingRatio(float4 relTuples,
+									 float4 relFrags,
+									 float4 requestedSampleSize);
+
+static char* parseFormat(char fmtcode);
+static char* escape_unprintables(const char *src);
+static char* escape_fmtopts_string(const char *src);
+static char* custom_fmtopts_string(const char *src);
+static void printExtTable(Oid relationOid, ExtTableEntry* extTable);
+static char* createPxfSampleStmt(Oid relationOid,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable,
+		float4 pxf_sample_ratio, int pxf_max_fragments);
+static float4 getPxfFragmentTupleCount(Oid relationOid);
+static float4 countFirstFragmentTuples(const char* schemaName,
+									   const char* tableName);
+static void getFragmentStats(Relation rel, StringInfo location,
+							 float4 *numfrags, float4 *firstfragsize,
+							 float4 *totalsize);
+
+
+void analyzePxfEstimateReltuplesRelpages(Relation relation,
+		StringInfo location,
+		float4* estimatedRelTuples,
+		float4* estimatedRelPages)
+{
+
+	float4 numFrags = 0.0;
+	float4 firstFragSize = 0.0;
+	float4 totalSize = 0.0;
+
+	float4 firstFragTuples = 0.0;
+	float4 estimatedTuples = 0.0;
+
+	/* get number of fragments, size of first fragment and total size.
+	 * This is used together with the number of tuples in first fragment
+	 * to estimate the number of tuples in the table. */
+	getFragmentStats(relation, location, &numFrags, &firstFragSize, &totalSize);
+
+	/* get number of tuples from first fragment */
+	firstFragTuples = getPxfFragmentTupleCount(relation->rd_id);
+
+	/* calculate estimated tuple count */
+	if (firstFragTuples > 0)
+	{
+		Assert(firstFragSize > 0);
+		Assert(totalSize > 0);
+		/* The calculation:
+		 * size of each tuple = first fragment size / first fragment row
+		 * total size = size of each tuple * number of tuples
+		 * number of tuples = total size / size of each tuple
+		 */
+		estimatedTuples = (totalSize / firstFragSize) * firstFragTuples;
+	}
+
+	elog(DEBUG2, "Estimated tuples for PXF table: %f. (first fragment count %f, fragments number %f, old estimate %f)",
+		 estimatedTuples, firstFragTuples, numFrags, *estimatedRelTuples);
+
+	*estimatedRelTuples = estimatedTuples;
+	*estimatedRelPages = numFrags;
+
+	/* relpages can't be 0 if there are tuples in the table. */
+	if ((*estimatedRelPages < 1.0) && (estimatedTuples > 0))
+	{
+		*estimatedRelPages = 1.0;
+	}
+
+	/* in case there were problems with the PXF service, keep the defaults */
+	if (*estimatedRelPages < 0)
+	{
+		*estimatedRelPages =  gp_external_table_default_number_of_pages;
+	}
+	if (*estimatedRelTuples < 0)
+	{
+		*estimatedRelTuples =  gp_external_table_default_number_of_tuples;
+	}
+}
+
+/*
+ * Creates a sample table with data from a PXF table.
+ * We need to create a copy of the PXF table, in order to pass the sampling
+ * parameters pxf_sample_ratio and pxf_max_fragments as attributes,
+ * and to create a segment reject limit of 25 percent.
+ *
+ * The new PXF table is sampled and the results are saved in the returned sample table.
+ * Note that ANALYZE can be executed only by the database owner.
+ * It is safe to assume that the database owner has permissions to create temp tables.
+ * The sampling is done by uniformly sampling pxf_sample_ratio records of each fragments,
+ * up to pxf_max_fragments.
+ *
+ * Input:
+ * 	relationOid 	- relation to be sampled
+ * 	sampleTableName - sample table name, moderately unique
+ * 	lAttributeNames - attributes to be included in the sample
+ * 	relTuples		- estimated size of relation
+ * 	relFrags		- estimated number of fragments in relation
+ * 	requestedSampleSize - as determined by attribute statistics requirements.
+ * 	sampleTableRelTuples	- limit on size of the sample.
+ * Output:
+ * 	sampleTableRelTuples - number of tuples in the sample table created.
+ */
+Oid buildPxfSampleTable(Oid relationOid,
+		char* sampleTableName,
+		List *lAttributeNames,
+		float4	relTuples,
+		float4  relFrags,
+		float4 	requestedSampleSize,
+		float4 *sampleTableRelTuples)
+{
+	const char *schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+	const char *tableName = get_rel_name(relationOid); /* must be pfreed */
+	char	*sampleSchemaName = pstrdup("pg_temp");
+	char	*pxfSampleTable = temporarySampleTableName(relationOid, "pg_analyze_pxf"); /* must be pfreed */
+	Oid			sampleTableOid = InvalidOid;
+	Oid			pxfSampleTableOid = InvalidOid;
+	RangeVar 	*rangeVar = NULL;
+	float4 pxfSamplingRatio = 0.0;
+
+	Assert(requestedSampleSize > 0.0);
+	Assert(relTuples > 0.0);
+	Assert(relFrags > 0.0);
+
+	/* calculate pxf_sample_ratio */
+	pxfSamplingRatio = calculateSamplingRatio(relTuples, relFrags, requestedSampleSize);
+
+	/* build copy of original pxf table */
+	buildPxfTableCopy(relationOid,
+					  pxfSamplingRatio,
+					  pxf_stat_max_fragments,
+					  schemaName, tableName,
+					  sampleSchemaName, pxfSampleTable);
+
+	rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, pxfSampleTable, -1);
+	pxfSampleTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+	buildSampleFromPxf(sampleSchemaName, sampleTableName, pxfSampleTable,
+					   lAttributeNames, sampleTableRelTuples);
+
+	rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, sampleTableName, -1);
+	sampleTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+	Assert(sampleTableOid != InvalidOid);
+
+	/**
+	 * MPP-10723: Very rarely, we may be unlucky and generate an empty sample table. We error out in this case rather than
+	 * generate bad statistics.
+	 */
+
+	if (*sampleTableRelTuples < 1.0)
+	{
+		elog(ERROR, "ANALYZE unable to generate accurate statistics on table %s.%s. Try lowering gp_analyze_relative_error",
+				quote_identifier(schemaName),
+				quote_identifier(tableName));
+	}
+
+	if (pxfSampleTableOid != InvalidOid)
+	{
+		elog(DEBUG2, "ANALYZE dropping PXF sample table");
+		dropSampleTable(pxfSampleTableOid, true);
+	}
+
+	pfree((void *) rangeVar);
+	pfree((void *) pxfSampleTable);
+	pfree((void *) tableName);
+	pfree((void *) schemaName);
+	pfree((void *) sampleSchemaName);
+	return sampleTableOid;
+}
+
+/*
+ * Creates an external PXF table, with the same properties
+ * as the given PXF table to be sampled, other than additional
+ * 2 attributes in the location clause -
+ * pxf_stats_sample_ratio and pxf_stats_max_fragments,
+ * and a segment reject limit of 25 percent.
+ */
+static void buildPxfTableCopy(Oid relationOid,
+		float4 samplingRatio,
+		int pxfStatMaxFragments,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable)
+{
+
+	/* create table string */
+	char* createPxfSampleStr = createPxfSampleStmt(relationOid,
+			schemaName, tableName,
+			sampleSchemaName, pxfSampleTable,
+			samplingRatio, pxfStatMaxFragments);
+
+	spiExecuteWithCallback(createPxfSampleStr, false /*readonly*/, 0 /*tcount */,
+			NULL, NULL);
+
+	pfree(createPxfSampleStr);
+
+	elog(DEBUG2, "Created PXF table %s.%s for sampling PXF table %s.%s",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(pxfSampleTable),
+			quote_identifier(schemaName),
+			quote_identifier(tableName));
+}
+
+/*
+ * Creates and populates a sample table for a PXF table.
+ * The actual queried table is not the original PXF table but a copy of it
+ * with additional attributes to enable sampling.
+ *
+ * The results are stored in sampleTableRelTuples.
+ */
+static void buildSampleFromPxf(const char* sampleSchemaName,
+		const char* sampleTableName,
+		const char* pxfSampleTable,
+		List *lAttributeNames,
+		float4 *sampleTableRelTuples)
+{
+	int nAttributes = -1;
+	int i = 0;
+	ListCell *le = NULL;
+	StringInfoData str;
+
+	initStringInfo(&str);
+
+	appendStringInfo(&str, "create table %s.%s as (select ",
+			quote_identifier(sampleSchemaName), quote_identifier(sampleTableName));
+
+	nAttributes = list_length(lAttributeNames);
+
+	foreach_with_count(le, lAttributeNames, i)
+	{
+		appendStringInfo(&str, "Ta.%s", quote_identifier((const char *) lfirst(le)));
+		if (i < nAttributes - 1)
+		{
+			appendStringInfo(&str, ", ");
+		}
+		else
+		{
+			appendStringInfo(&str, " ");
+		}
+	}
+
+	appendStringInfo(&str, "from %s.%s as Ta) distributed randomly",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(pxfSampleTable));
+
+	/* in case of PXF error, analyze on this table will reverted */
+	spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+			spiCallback_getProcessedAsFloat4, sampleTableRelTuples);
+
+	pfree(str.data);
+
+	elog(DEBUG2, "Created sample table %s.%s with nrows=%.0f",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(sampleTableName),
+			*sampleTableRelTuples);
+}
+
+/*
+ * Returns a sampling ratio - a fraction between 1.0 and 0.0001
+ * representing how many samples should be returned from each fragment
+ * of a PXF table.
+ * The ratio is calculated based on the tuples estimate of the table
+ * and on the number of the actually sampled fragments
+ * (GUC pxf_stat_max_fragments), by the following formula:
+ * ratio = (<sample size> / <tuples estimate>) * (<total # fragments> / <fragments to be sampled>)
+ * If the ratio is too big or small, it is corrected to 1.0 or 0.0001 respectively.
+ *
+ * Input:
+ * 	relTuples		- number of tuples in the table
+ * 	relFrags		- number of fragments in the table
+ * 	requestedSampleSize - number of sample tuples required
+ * Output:
+ * 	the sampling ratio for the table.
+ */
+static float4 calculateSamplingRatio(float4 relTuples,
+		 float4 relFrags,
+		 float4 requestedSampleSize)
+{
+	float4 sampleRatio = 0.0;
+
+	Assert(relFrags > 0);
+	Assert(relTuples > 0);
+	Assert(requestedSampleSize > 0);
+
+	/* sample ratio for regular tables */
+	sampleRatio = requestedSampleSize / relTuples;
+
+	if (pxf_stat_max_fragments < relFrags)
+	{
+		/*
+		 * Correct ratio according to the number of sampled fragments.
+		 * If there are less fragments to sample, the ratio should be increased.
+		 * If the corrected sampling ratio is > 100%, make it 100%
+		 */
+		sampleRatio = sampleRatio * (relFrags / pxf_stat_max_fragments);
+		if (sampleRatio > 1.0)
+		{
+			sampleRatio = 1.0;
+		}
+	}
+
+	/*
+	 * If the ratio is too low (< 0.0001), correct it to 0.0001.
+	 * That means that the lowest rate we will get is 1 tuple per 10,000.
+	 */
+	if (sampleRatio < 0.0001)
+	{
+		sampleRatio = 0.0001;
+	}
+
+	elog(DEBUG2, "PXF ANALYZE: pxf_stats_sample_ratio = %f, pxf_stats_max_fragments = %d, table fragments = %f",
+		 sampleRatio, pxf_stat_max_fragments, relFrags);
+	return sampleRatio;
+}
+
+static char* parseFormat(char fmtcode)
+{
+	if (fmttype_is_custom(fmtcode))
+		return "CUSTOM";
+	if (fmttype_is_text(fmtcode))
+		return "TEXT";
+	if (fmttype_is_csv(fmtcode))
+		return "CSV";
+
+	elog(ERROR, "Unrecognized external table format '%c'", fmtcode);
+	return NULL;
+}
+
+/* Helper functions from dumputils.c, modified to backend (malloc->palloc) */
+
+/*
+ * Escape any unprintables (0x00 - 0x1F) in given string
+ */
+char *
+escape_unprintables(const char *src)
+{
+	int			len = strlen(src),
+				i,
+				j;
+	char	   *result = palloc0(len * 4 + 1);
+	if (!result)
+		return NULL; /* out of memory */
+
+	for (i = 0, j = 0; i < len; i++)
+	{
+		if ((src[i] <= '\x1F') && (src[i] != '\x09' /* TAB */))
+		{
+			snprintf(&(result[j]), 5, "\\x%02X", src[i]);
+			j += 4;
+		}
+		else
+			result[j++] = src[i];
+	}
+	result[j] = '\0';
+	return result;
+}
+
+/*
+ * Escape backslashes and apostrophes in EXTERNAL TABLE format strings.
+ *
+ * The fmtopts field of a pg_exttable tuple has an odd encoding -- it is
+ * partially parsed and contains "string" values that aren't legal SQL.
+ * Each string value is delimited by apostrophes and is usually, but not
+ * always, a single character.	The fmtopts field is typically something
+ * like {delimiter '\x09' null '\N' escape '\'} or
+ * {delimiter ',' null '' escape '\' quote '''}.  Each backslash and
+ * apostrophe in a string must be escaped and each string must be
+ * prepended with an 'E' denoting an "escape syntax" string.
+ *
+ * Usage note: A field value containing an apostrophe followed by a space
+ * will throw this algorithm off -- it presumes no embedded spaces.
+ */
+static char* escape_fmtopts_string(const char *src)
+{
+	int			len = strlen(src);
+	int			i;
+	int			j;
+	char	   *result = palloc0(len * 2 + 1);
+	bool		inString = false;
+
+	for (i = 0, j = 0; i < len; i++)
+	{
+		switch (src[i])
+		{
+			case '\'':
+				if (inString)
+				{
+					/*
+					 * Escape apostrophes *within* the string. If the
+					 * apostrophe is at the end of the source string or is
+					 * followed by a space, it is presumed to be a closing
+					 * apostrophe and is not escaped.
+					 */
+					if ((i + 1) == len || src[i + 1] == ' ')
+						inString = false;
+					else
+						result[j++] = '\\';
+				}
+				else
+				{
+					result[j++] = 'E';
+					inString = true;
+				}
+				break;
+			case '\\':
+				result[j++] = '\\';
+				break;
+		}
+
+		result[j++] = src[i];
+	}
+
+	result[j] = '\0';
+	return result;
+}
+
+/*
+ * Tokenize a fmtopts string (for use with 'custom' formatters)
+ * i.e. convert it to: a = b, format.
+ * (e.g.:  formatter E'fixedwidth_in null E' ' preserve_blanks E'on')
+ */
+static char* custom_fmtopts_string(const char *src)
+{
+		int			len = src ? strlen(src) : 0;
+		char	   *result = palloc0(len * 2 + 1);
+		char	   *srcdup = src ? pstrdup(src) : NULL;
+		char	   *srcdup_start = srcdup;
+		char       *find_res = NULL;
+		int        last = 0;
+
+		if(!src || !srcdup || !result)
+			return NULL;
+
+		while (srcdup)
+		{
+			/* find first word (a) */
+			find_res = strchr(srcdup, ' ');
+			if (!find_res)
+				break;
+			strncat(result, srcdup, (find_res - srcdup));
+			/* skip space */
+			srcdup = find_res + 1;
+			/* remove E if E' */
+			if((strlen(srcdup) > 2) && (srcdup[0] == 'E') && (srcdup[1] == '\''))
+				srcdup++;
+			/* add " = " */
+			strncat(result, " = ", 3);
+			/* find second word (b) until second '
+			   find \' combinations and ignore them */
+			find_res = strchr(srcdup + 1, '\'');
+			while (find_res && (*(find_res - 1) == '\\') /* ignore \' */)
+			{
+				find_res = strchr(find_res + 1, '\'');
+			}
+			if (!find_res)
+				break;
+			strncat(result, srcdup, (find_res - srcdup + 1));
+			srcdup = find_res + 1;
+			/* skip space and add ',' */
+			if (srcdup && srcdup[0] == ' ')
+			{
+				srcdup++;
+				strncat(result, ",", 1);
+			}
+		}
+
+		/* fix string - remove trailing ',' or '=' */
+		last = strlen(result)-1;
+		if(result[last] == ',' || result[last] == '=')
+			result[last]='\0';
+
+		pfree(srcdup_start);
+		return result;
+}
+
+static void printExtTable(Oid relationOid, ExtTableEntry* extTable)
+{
+
+	if (extTable == NULL)
+		return;
+
+	elog(DEBUG2, "extTable params: oid: %d command: %s, encoding: %d, "
+			"format: %c (%s), error table oid: %d, format options: %s, "
+			"is web: %d, is writable: %d, locations size: %d, "
+			"reject limit: %d, reject limit type: %c",
+			relationOid,
+			extTable->command ? extTable->command : "NULL",
+			extTable->encoding,
+			extTable->fmtcode,
+			parseFormat(extTable->fmtcode),
+			extTable->fmterrtbl,
+			extTable->fmtopts,
+			extTable->isweb,
+			extTable->iswritable,
+			list_length(extTable->locations),
+			extTable->rejectlimit,
+			extTable->rejectlimittype == -1 ? 'n' : extTable->rejectlimittype);
+}
+
+/*
+ * This method returns an SQL command to create a PXF table
+ * which is a copy of a given PXF table relationOid, with the following changes:
+ * - PXF sample table name is pg_temp.pg_analyze_pxf_<relationOid>
+ * - LOCATION part is appended 2 attributes - pxf_sample_ratio, pxf_max_fragments.
+ * - in case of error table - SEGMENT REJECT LIMIT 25 PERCENT
+ *
+ * Input:
+ * 	relationOid 		- relation to be copied
+ * 	schemaName 			- schema name of original table
+ * 	tableName			- table name of original table
+ * 	sampleSchemaName	- schema name of new table
+ * 	pxfSampleTable		- table name or new table
+ * 	pxf_sample_ratio	- ratio of samplings to be done per fragment
+ * 	pxf_max_fragments	- max number of fragments to be sampled
+ * Output:
+ * 	SQL statement string for creating the new table
+ */
+static char* createPxfSampleStmt(Oid relationOid,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable,
+		float4 pxf_sample_ratio, int pxf_max_fragments)
+{
+	ExtTableEntry *extTable = GetExtTableEntry(relationOid);
+	StringInfoData str;
+	initStringInfo(&str);
+	char* location = NULL;
+	char* tmpstring = NULL;
+	char* escapedfmt = NULL;
+	char* tabfmt = NULL;
+	char* customfmt = NULL;
+
+	printExtTable(relationOid, extTable);
+
+	location = escape_unprintables(((Value*)list_nth(extTable->locations, 0))->val.str /*pxfLocation*/);
+
+	appendStringInfo(&str, "CREATE EXTERNAL TABLE %s.%s (LIKE %s.%s) "
+			"LOCATION(E'%s&STATS-SAMPLE-RATIO=%.4f&STATS-MAX-FRAGMENTS=%d') ",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(pxfSampleTable),
+			quote_identifier(schemaName),
+			quote_identifier(tableName),
+			location,
+			pxf_sample_ratio,
+			pxf_max_fragments);
+
+	pfree(location);
+
+	/* add FORMAT clause */
+	escapedfmt = escape_fmtopts_string((const char *) extTable->fmtopts);
+	tmpstring = escape_unprintables((const char *) escapedfmt);
+	pfree(escapedfmt);
+	escapedfmt = NULL;
+
+	switch (extTable->fmtcode)
+	{
+	case 't':
+		tabfmt = "text";
+		break;
+	case 'b':
+		/*
+		 * b denotes that a custom format is used.
+		 * the fmtopts string should be formatted as:
+		 * a1 = 'val1',...,an = 'valn'
+		 *
+		 */
+		tabfmt = "custom";
+		customfmt = custom_fmtopts_string(tmpstring);
+		break;
+	default:
+		tabfmt = "csv";
+	}
+	appendStringInfo(&str, "FORMAT '%s' (%s) ",
+			tabfmt,
+			customfmt ? customfmt : tmpstring);
+	pfree(tmpstring);
+	tmpstring = NULL;
+	if (customfmt)
+	{
+		pfree(customfmt);
+		customfmt = NULL;
+	}
+	/* add ENCODING clause */
+	appendStringInfo(&str, "ENCODING '%s' ", pg_encoding_to_char(extTable->encoding));
+
+	/* add error control clause */
+	if (extTable->rejectlimit != -1)
+	{
+		appendStringInfo(&str, "%s", "SEGMENT REJECT LIMIT 25 PERCENT ");
+	}
+
+	elog(DEBUG2, "createPxfSampleStmt SQL statement: %s", str.data);
+
+	return str.data;
+}
+
+/*
+ * Returns the number of tuples in the first fragment of given
+ * PXF table.
+ * This is done by creating a copy of the PXF table, with additional parameters
+ * limiting the query to the first fragment only (pxf_max_fragments = 1, pxf_sample_ratio = 1.0),
+ * and running a COUNT query on it.
+ * The tuple count result is returned.
+ *
+ * Input:
+ * 	relationOid 	- relation to be sampled
+ */
+static float4 getPxfFragmentTupleCount(Oid relationOid)
+{
+	const char *schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+	const char *tableName = get_rel_name(relationOid); /* must be pfreed */
+	char	*sampleSchemaName = pstrdup("pg_temp");
+	char	*pxfEstimateTable = temporarySampleTableName(relationOid, "pg_analyze_pxf_est"); /* must be pfreed */
+	Oid			pxfEstimateTableOid = InvalidOid;
+	RangeVar 	*rangeVar = NULL;
+	float4	ntuples = -1.0;
+
+	/* build copy of original pxf table */
+	buildPxfTableCopy(relationOid,
+					  1.0, /* get all tuples */
+					  1, /* query only first fragment */
+					  schemaName, tableName,
+					  sampleSchemaName, pxfEstimateTable);
+
+	rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, pxfEstimateTable, -1);
+	pxfEstimateTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+	if (pxfEstimateTableOid == InvalidOid)
+	{
+		elog(ERROR, "Unable to create a copy of PXF table %s.%s", schemaName, tableName);
+	}
+
+	/* run count query */
+	ntuples = countFirstFragmentTuples(sampleSchemaName, pxfEstimateTable);
+
+	Assert(pxfEstimateTable != InvalidOid);
+
+	elog(DEBUG2, "ANALYZE dropping PXF estimate table %s.%s (%d)",
+		 sampleSchemaName, pxfEstimateTable, pxfEstimateTableOid);
+	dropSampleTable(pxfEstimateTableOid, true);
+
+	pfree((void *) rangeVar);
+	pfree((void *) pxfEstimateTable);
+	pfree((void *) tableName);
+	pfree((void *) schemaName);
+	pfree((void *) sampleSchemaName);
+
+	return ntuples;
+}
+
+static float4 countFirstFragmentTuples(const char* schemaName,
+									   const char* tableName)
+{
+	float ntuples = -1.0;
+	StringInfoData str;
+
+	initStringInfo(&str);
+	appendStringInfo(&str, "select count(*)::float4 from %s.%s",
+			quote_identifier(schemaName),
+			quote_identifier(tableName));
+
+	/* in case of PXF error, analyze on this table will be reverted */
+	spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+						   spiCallback_getSingleResultRowColumnAsFloat4, &ntuples);
+
+	pfree(str.data);
+
+	elog(DEBUG3, "count() of first pxf fragment gives %f values.", ntuples);
+
+	return ntuples;
+}
+
+/* --------------------------------
+ *		getFragmentStats  -
+ *
+ *		Fetch number of fragments, size of first fragment and total size of datasource,
+ *		for an external table which is PXF
+ * --------------------------------
+ */
+static void getFragmentStats(Relation rel, StringInfo location,
+							 float4 *numfrags, float4 *firstfragsize,
+							 float4 *totalsize)
+{
+
+	PxfFragmentStatsElem *elem = NULL;
+	elem = get_pxf_fragments_statistics(location->data, rel);
+
+	/*
+	 * if get_pxf_fragments_statistics returned NULL - probably a communication error, we
+	 * error out.
+	 */
+	if (!elem)
+	{
+		elog(ERROR, "No statistics were returned for relation %s", RelationGetRelationName(rel));
+	}
+
+	*numfrags = elem->numFrags;
+	*firstfragsize = elem->firstFragSize;
+	*totalsize = elem->totalSize;
+	pfree(elem);
+
+	elog(DEBUG2, "ANALYZE estimate for PXF table %s: fragments %f, first frag size %f, "
+			"total size %f [max int %d]",
+			RelationGetRelationName(rel), *numfrags, *firstfragsize, *totalsize, INT_MAX);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/pxfmasterapi.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfmasterapi.c b/src/backend/access/external/pxfmasterapi.c
index ba505b3..b1a6240 100644
--- a/src/backend/access/external/pxfmasterapi.c
+++ b/src/backend/access/external/pxfmasterapi.c
@@ -30,11 +30,11 @@
 #include "catalog/hcatalog/externalmd.h"
 
 static List* parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf);
-static PxfStatsElem* parse_get_stats_response(StringInfo rest_buf);
+static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf);
+static float4 normalize_size(long size, char* unit);
 static List* parse_get_fragments_response(List* fragments, StringInfo rest_buf);
 static void ha_failover(GPHDUri *hadoop_uri, ClientContext *client_context, char* rest_msg);
-static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *
-);
+static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg);
 static char* concat(char *body, char *tail);
 
 /*
@@ -72,14 +72,14 @@ parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf)
  * Wrap the REST call with a retry for the HA HDFS scenario
  */
 static void
-rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *restMsg)
+rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg)
 {
 	Assert(hadoop_uri->host != NULL && hadoop_uri->port != NULL);
 
 	/* construct the request */
 	PG_TRY();
 	{
-		call_rest(hadoop_uri, client_context, restMsg);
+		call_rest(hadoop_uri, client_context, rest_msg);
 	}
 	PG_CATCH();
 	{
@@ -93,7 +93,7 @@ rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *restMsg)
 			if (!elog_dismiss(DEBUG5))
 				PG_RE_THROW(); /* hope to never get here! */
 
-			ha_failover(hadoop_uri, client_context, restMsg);
+			ha_failover(hadoop_uri, client_context, rest_msg);
 		}
 		else /*This is not HA - so let's re-throw */
 			PG_RE_THROW();
@@ -140,77 +140,83 @@ void free_datanode_rest_server(PxfServer* srv)
 }
 
 /*
- * Fetch the statistics from the PXF service
+ * Fetch fragment statistics from the PXF service
  */
-PxfStatsElem *get_data_statistics(GPHDUri* hadoop_uri,
-										 ClientContext *client_context,
-										 StringInfo err_msg)
+PxfFragmentStatsElem *get_fragments_statistics(GPHDUri* hadoop_uri,
+											   ClientContext *client_context)
 {
-	char *restMsg = concat("http://%s:%s/%s/%s/Analyzer/getEstimatedStats?path=", hadoop_uri->data);
+	char *restMsg = concat("http://%s:%s/%s/%s/Fragmenter/getFragmentsStats?path=", hadoop_uri->data);
 
 	/* send the request. The response will exist in rest_buf.data */
-	PG_TRY();
-	{
-		rest_request(hadoop_uri, client_context, restMsg);
-	}
-	PG_CATCH();
-	{
-		/*
-		 * communication problems with PXF service
-		 * Statistics for a table can be done as part of an ANALYZE procedure on many tables,
-		 * and we don't want to stop because of a communication error. So we catch the exception,
-		 * append its error to err_msg, and return a NULL,
-		 * which will force the the analyze code to use former calculated values or defaults.
-		 */
-		if (err_msg)
-		{
-			char* message = elog_message();
-			if (message)
-				appendStringInfo(err_msg, "%s", message);
-			else
-				appendStringInfo(err_msg, "Unknown error");
-		}
-
-		/* release error state */
-		if (!elog_dismiss(DEBUG5))
-			PG_RE_THROW(); /* hope to never get here! */
-
-		return NULL;
-	}
-	PG_END_TRY();
+	rest_request(hadoop_uri, client_context, restMsg);
 
-	/* parse the JSON response and form a fragments list to return */
-	return parse_get_stats_response(&(client_context->the_rest_buf));
+	/* parse the JSON response and form a statistics struct to return */
+	return parse_get_frag_stats_response(&(client_context->the_rest_buf));
 }
 
 /*
- * Parse the json response from the PXF Fragmenter.getSize
+ * Parse the json response from the PXF Fragmenter.getFragmentsStats
  */
-static PxfStatsElem *parse_get_stats_response(StringInfo rest_buf)
+static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf)
 {
-	PxfStatsElem* statsElem = (PxfStatsElem*)palloc0(sizeof(PxfStatsElem));
+	PxfFragmentStatsElem* statsElem = (PxfFragmentStatsElem*)palloc0(sizeof(PxfFragmentStatsElem));
 	struct json_object	*whole	= json_tokener_parse(rest_buf->data);
 	if ((whole == NULL) || is_error(whole))
 	{
 		elog(ERROR, "Failed to parse statistics data from PXF");
 	}
-	struct json_object	*head	= json_object_object_get(whole, "PXFDataSourceStats");
-
-	/* 0. block size */
-	struct json_object *js_block_size = json_object_object_get(head, "blockSize");
-	statsElem->blockSize = json_object_get_int(js_block_size);
-
-	/* 1. number of blocks */
-	struct json_object *js_num_blocks = json_object_object_get(head, "numberOfBlocks");
-	statsElem->numBlocks = json_object_get_int(js_num_blocks);
-
-	/* 2. number of tuples */
-	struct json_object *js_num_tuples = json_object_object_get(head, "numberOfTuples");
-	statsElem->numTuples = json_object_get_int(js_num_tuples);
+	struct json_object	*head	= json_object_object_get(whole, "PXFFragmentsStats");
+
+	/* 0. number of fragments */
+	struct json_object *js_num_fragments = json_object_object_get(head, "fragmentsNumber");
+	statsElem->numFrags = json_object_get_int(js_num_fragments);
+
+	/* 1. first fragment size */
+	struct json_object *js_first_frag_size = json_object_object_get(head, "firstFragmentSize");
+	struct json_object *js_size = json_object_object_get(js_first_frag_size, "size");
+	long size = json_object_get_int(js_size);
+	struct json_object *js_unit = json_object_object_get(js_first_frag_size, "unit");
+	char* unit = pstrdup(json_object_get_string(js_unit));
+	statsElem->firstFragSize = normalize_size(size, unit);
+	pfree(unit);
+
+	/* 2. total size */
+	struct json_object *js_total_size = json_object_object_get(head, "totalSize");
+	js_size = json_object_object_get(js_total_size, "size");
+	size = json_object_get_int(js_size);
+	js_unit = json_object_object_get(js_total_size, "unit");
+	unit = pstrdup(json_object_get_string(js_unit));
+	statsElem->totalSize = normalize_size(size, unit);
+	pfree(unit);
 
 	return statsElem;
 }
 
+static float4 normalize_size(long size, char* unit) {
+	const float4 multiplier = 1024.0;
+	if (strcmp(unit,"B") == 0)
+	{
+		return size;
+	}
+	if (strcmp(unit,"KB") == 0)
+	{
+		return size * multiplier;
+	}
+	if (strcmp(unit,"MB") == 0)
+	{
+		return size * multiplier * multiplier;
+	}
+	if (strcmp(unit,"GB") == 0)
+	{
+		return size * multiplier * multiplier * multiplier;
+	}
+	if (strcmp(unit,"TB") == 0)
+	{
+		return size * multiplier * multiplier * multiplier * multiplier;
+	}
+	return -1;
+}
+
 /*
  * ha_failover
  *

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/Makefile b/src/backend/access/external/test/Makefile
index e13d107..5778958 100644
--- a/src/backend/access/external/test/Makefile
+++ b/src/backend/access/external/test/Makefile
@@ -1,7 +1,7 @@
 subdir=src/backend/access/external
 top_builddir=../../../../..
 
-TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi
+TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi pxfanalyze
 
 # Objects from backend, which don't need to be mocked but need to be linked.
 COMMON_REAL_OBJS=\
@@ -59,7 +59,11 @@ pxfmasterapi_REAL_OBJS=$(COMMON_REAL_OBJS) \
 	$(top_srcdir)/src/backend/utils/fmgrtab.o	
 pxffilters_REAL_OBJS=$(COMMON_REAL_OBJS) \
 	$(top_srcdir)/src/backend/optimizer/util/clauses.o \
-	$(top_srcdir)/src/backend/parser/parse_expr.o    
+	$(top_srcdir)/src/backend/parser/parse_expr.o
+pxfanalyze_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/utils/adt/ruleutils.o \
+	$(top_srcdir)/src/backend/parser/kwlookup.o \
+	$(top_srcdir)/src/backend/utils/mb/encnames.o    
 
 include ../../../../Makefile.mock
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/README.txt
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/README.txt b/src/backend/access/external/test/README.txt
index 75a03f0..dfe73b8 100644
--- a/src/backend/access/external/test/README.txt
+++ b/src/backend/access/external/test/README.txt
@@ -1,6 +1,9 @@
 Directory with the following System Under Test (SUT):
- - pxfuriparser.c
- - hd_work_mgr.c
- - pxfheaders.c
  - ha_config.c
+ - hd_work_mgr.c
+ - pxfanalyze.c
  - pxffilters.c
+ - pxfheaders.c
+ - pxfmasterapi.c
+ - pxfuriparser.c
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
index ce8e61d..91bfa43 100644
--- a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
+++ b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
@@ -28,15 +28,17 @@
 
 /*
  * check element list_index in segmenet_list
- * has the expected hostip.
+ * has the expected hostip and segindex.
  */
 void check_segment_info(List* segment_list, int list_index,
-						const char* expected_hostip)
+						const char* expected_hostip,
+						int expected_segindex)
 {
 
 	Segment* seg_info =
 			(Segment*)(list_nth(segment_list, list_index));
 	assert_string_equal(seg_info->hostip, expected_hostip);
+	assert_int_equal(seg_info->segindex, expected_segindex);
 }
 
 /*
@@ -76,9 +78,9 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
 	gphost = (GpHost*)lfirst(cell);
 	assert_string_equal(gphost->ip, array_of_segs[0]);
 	assert_int_equal(list_length(gphost->segs), 4);
-    for (int i = 0; i < 4; ++i)
+	for (int i = 0; i < 4; ++i)
 	{
-		check_segment_info(gphost->segs, i, "1.2.3.1");
+		check_segment_info(gphost->segs, i, "1.2.3.1", i);
 	}
 
 	cell = list_nth_cell(groups, 1);
@@ -87,7 +89,7 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
 	assert_int_equal(list_length(gphost->segs), 3);
 	for (int i = 0; i < 3; ++i)
 	{
-		check_segment_info(gphost->segs, i, "1.2.3.2");
+		check_segment_info(gphost->segs, i, "1.2.3.2", i+4);
 	}
 
 	cell = list_nth_cell(groups, 2);
@@ -96,9 +98,8 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
 	assert_int_equal(list_length(gphost->segs), 3);
 	for (int i = 0; i < 3; ++i)
 	{
-		check_segment_info(gphost->segs, i, "1.2.3.3");
+		check_segment_info(gphost->segs, i, "1.2.3.3", i+7);
 	}
 
 	freeQueryResource();
 }
-