You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by nh...@apache.org on 2015/11/20 22:50:40 UTC
[2/3] incubator-hawq git commit: HAWQ-44. Advanced statistics for PXF
tables.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 06ff72b..a734ade 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -33,6 +33,9 @@ public class ProtocolData extends InputData {
protected String host;
protected String profile;
protected String token;
+ // statistics parameters
+ protected int statsMaxFragments;
+ protected float statsSampleRatio;
/**
* Constructs a ProtocolData. Parses X-GP-* configuration variables.
@@ -88,6 +91,10 @@ public class ProtocolData extends InputData {
dataFragment = INVALID_SPLIT_IDX;
parseDataFragment(getOptionalProperty("DATA-FRAGMENT"));
+ statsMaxFragments = 0;
+ statsSampleRatio = 0;
+ parseStatsParameters();
+
// Store alignment for global use as a system property
System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
}
@@ -120,10 +127,8 @@ public class ProtocolData extends InputData {
this.remoteLogin = copy.remoteLogin;
this.remoteSecret = copy.remoteSecret;
this.token = copy.token;
- }
-
- public String getToken() {
- return token;
+ this.statsMaxFragments = copy.statsMaxFragments;
+ this.statsSampleRatio = copy.statsSampleRatio;
}
/**
@@ -264,6 +269,38 @@ public class ProtocolData extends InputData {
}
/**
+ * Returns Kerberos token information.
+ *
+ * @return token
+ */
+ public String getToken() {
+ return token;
+ }
+
+ /**
+ * Statistics parameter. Returns the max number of fragments to return for
+ * ANALYZE sampling. The value is set in HAWQ side using the GUC
+ * pxf_stats_max_fragments.
+ *
+ * @return max number of fragments to be processed by analyze
+ */
+ public int getStatsMaxFragments() {
+ return statsMaxFragments;
+ }
+
+ /**
+ * Statistics parameter. Returns a number between 0.0001 and 1.0,
+ * representing the sampling ratio on each fragment for ANALYZE sampling.
+ * The value is set in HAWQ side based on ANALYZE computations and the
+ * number of sampled fragments.
+ *
+ * @return sampling ratio
+ */
+ public float getStatsSampleRatio() {
+ return statsSampleRatio;
+ }
+
+ /**
* Sets the thread safe parameter. Default value - true.
*/
private void parseThreadSafe() {
@@ -371,4 +408,34 @@ public class ProtocolData extends InputData {
remoteLogin = getOptionalProperty("REMOTE-USER");
remoteSecret = getOptionalProperty("REMOTE-PASS");
}
+
+ private void parseStatsParameters() {
+
+ String maxFrags = getOptionalProperty("STATS-MAX-FRAGMENTS");
+ if (!StringUtils.isEmpty(maxFrags)) {
+ statsMaxFragments = Integer.parseInt(maxFrags);
+ if (statsMaxFragments <= 0) {
+ throw new IllegalArgumentException("Wrong value '"
+ + statsMaxFragments + "'. "
+ + "STATS-MAX-FRAGMENTS must be a positive integer");
+ }
+ }
+
+ String sampleRatioStr = getUserProperty("STATS-SAMPLE-RATIO");
+ if (!StringUtils.isEmpty(sampleRatioStr)) {
+ statsSampleRatio = Float.parseFloat(sampleRatioStr);
+ if (statsSampleRatio < 0.0001 || statsSampleRatio > 1.0) {
+ throw new IllegalArgumentException(
+ "Wrong value '"
+ + statsSampleRatio
+ + "'. "
+ + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+ }
+ }
+
+ if ((statsSampleRatio > 0) != (statsMaxFragments > 0)) {
+ throw new IllegalArgumentException(
+ "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
index 137622d..57fb0f4 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
@@ -2,66 +2,156 @@ package org.apache.hawq.pxf.service;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.DataOutput;
+import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.junit.Test;
-
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.BufferWritable;
import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.Writable;
import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.junit.Test;
public class BridgeOutputBuilderTest {
+ /**
+ * Test class to check the data inside BufferWritable.
+ */
+ private class DataOutputToBytes implements DataOutput {
+
+ byte[] output;
+
+ public byte[] getOutput() {
+ return output;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ output = b;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ throw new IOException("not implemented");
+ }
+ }
+
private static final int UN_SUPPORTED_TYPE = -1;
private GPDBWritable output = null;
+ private DataOutputToBytes dos = new DataOutputToBytes();
@Test
public void testFillGPDBWritable() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "14");
- addColumn(parameters, 0, DataType.INTEGER, "col0");
- addColumn(parameters, 1, DataType.FLOAT8, "col1");
- addColumn(parameters, 2, DataType.REAL, "col2");
- addColumn(parameters, 3, DataType.BIGINT, "col3");
- addColumn(parameters, 4, DataType.SMALLINT, "col4");
- addColumn(parameters, 5, DataType.BOOLEAN, "col5");
- addColumn(parameters, 6, DataType.BYTEA, "col6");
- addColumn(parameters, 7, DataType.VARCHAR, "col7");
- addColumn(parameters, 8, DataType.BPCHAR, "col8");
- addColumn(parameters, 9, DataType.CHAR, "col9");
- addColumn(parameters, 10, DataType.TEXT, "col10");
- addColumn(parameters, 11, DataType.NUMERIC, "col11");
- addColumn(parameters, 12, DataType.TIMESTAMP,"col12");
- addColumn(parameters, 13, DataType.DATE, "col13");
-
+ addColumn(parameters, 0, DataType.INTEGER, "col0");
+ addColumn(parameters, 1, DataType.FLOAT8, "col1");
+ addColumn(parameters, 2, DataType.REAL, "col2");
+ addColumn(parameters, 3, DataType.BIGINT, "col3");
+ addColumn(parameters, 4, DataType.SMALLINT, "col4");
+ addColumn(parameters, 5, DataType.BOOLEAN, "col5");
+ addColumn(parameters, 6, DataType.BYTEA, "col6");
+ addColumn(parameters, 7, DataType.VARCHAR, "col7");
+ addColumn(parameters, 8, DataType.BPCHAR, "col8");
+ addColumn(parameters, 9, DataType.CHAR, "col9");
+ addColumn(parameters, 10, DataType.TEXT, "col10");
+ addColumn(parameters, 11, DataType.NUMERIC, "col11");
+ addColumn(parameters, 12, DataType.TIMESTAMP, "col12");
+ addColumn(parameters, 13, DataType.DATE, "col13");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
- List<OneField> recFields = Arrays.asList(new OneField(DataType.INTEGER.getOID(), 0),
- new OneField(DataType.FLOAT8.getOID(), (double) 0),
- new OneField(DataType.REAL.getOID(), (float) 0),
- new OneField(DataType.BIGINT.getOID(), (long) 0),
- new OneField(DataType.SMALLINT.getOID(), (short) 0),
- new OneField(DataType.BOOLEAN.getOID(), true),
- new OneField(DataType.BYTEA.getOID(), new byte[]{0}),
- new OneField(DataType.VARCHAR.getOID(), "value"),
- new OneField(DataType.BPCHAR.getOID(), "value"),
- new OneField(DataType.CHAR.getOID(), "value"),
- new OneField(DataType.TEXT.getOID(), "value"),
- new OneField(DataType.NUMERIC.getOID(), "0"),
- new OneField(DataType.TIMESTAMP.getOID(), new Timestamp(0)),
+ List<OneField> recFields = Arrays.asList(
+ new OneField(DataType.INTEGER.getOID(), 0), new OneField(
+ DataType.FLOAT8.getOID(), (double) 0), new OneField(
+ DataType.REAL.getOID(), (float) 0), new OneField(
+ DataType.BIGINT.getOID(), (long) 0), new OneField(
+ DataType.SMALLINT.getOID(), (short) 0), new OneField(
+ DataType.BOOLEAN.getOID(), true), new OneField(
+ DataType.BYTEA.getOID(), new byte[] { 0 }),
+ new OneField(DataType.VARCHAR.getOID(), "value"), new OneField(
+ DataType.BPCHAR.getOID(), "value"), new OneField(
+ DataType.CHAR.getOID(), "value"), new OneField(
+ DataType.TEXT.getOID(), "value"), new OneField(
+ DataType.NUMERIC.getOID(), "0"), new OneField(
+ DataType.TIMESTAMP.getOID(), new Timestamp(0)),
new OneField(DataType.DATE.getOID(), new Date(1)));
builder.fillGPDBWritable(recFields);
@@ -71,30 +161,33 @@ public class BridgeOutputBuilderTest {
assertEquals(output.getLong(3), Long.valueOf(0));
assertEquals(output.getShort(4), Short.valueOf((short) 0));
assertEquals(output.getBoolean(5), true);
- assertArrayEquals(output.getBytes(6), new byte[]{0});
+ assertArrayEquals(output.getBytes(6), new byte[] { 0 });
assertEquals(output.getString(7), "value\0");
assertEquals(output.getString(8), "value\0");
assertEquals(output.getString(9), "value\0");
assertEquals(output.getString(10), "value\0");
assertEquals(output.getString(11), "0\0");
assertEquals(Timestamp.valueOf(output.getString(12)), new Timestamp(0));
- assertEquals(Date.valueOf(output.getString(13).trim()).toString(), new Date(1).toString());
+ assertEquals(Date.valueOf(output.getString(13).trim()).toString(),
+ new Date(1).toString());
}
@Test
public void testFillOneGPDBWritableField() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
- parameters.put("X-GP-ATTRS", "1");
- addColumn(parameters, 0, DataType.INTEGER, "col0");
+ parameters.put("X-GP-ATTRS", "1");
+ addColumn(parameters, 0, DataType.INTEGER, "col0");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
- OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte((byte) 0));
+ OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte(
+ (byte) 0));
try {
builder.fillOneGPDBWritableField(unSupportedField, 0);
fail("Unsupported data type should throw exception");
} catch (UnsupportedOperationException e) {
- assertEquals(e.getMessage(), "Byte is not supported for HAWQ conversion");
+ assertEquals(e.getMessage(),
+ "Byte is not supported for HAWQ conversion");
}
}
@@ -113,10 +206,10 @@ public class BridgeOutputBuilderTest {
/* all four fields */
List<OneField> complete = Arrays.asList(
- new OneField(DataType.INTEGER.getOID(), 10),
- new OneField(DataType.INTEGER.getOID(), 20),
- new OneField(DataType.INTEGER.getOID(), 30),
- new OneField(DataType.INTEGER.getOID(), 40));
+ new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+ DataType.INTEGER.getOID(), 20), new OneField(
+ DataType.INTEGER.getOID(), 30), new OneField(
+ DataType.INTEGER.getOID(), 40));
builder.fillGPDBWritable(complete);
assertEquals(output.getColType().length, 4);
assertEquals(output.getInt(0), Integer.valueOf(10));
@@ -126,13 +219,14 @@ public class BridgeOutputBuilderTest {
/* two fields instead of four */
List<OneField> incomplete = Arrays.asList(
- new OneField(DataType.INTEGER.getOID(), 10),
- new OneField(DataType.INTEGER.getOID(), 20));
+ new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+ DataType.INTEGER.getOID(), 20));
try {
builder.fillGPDBWritable(incomplete);
fail("testRecordBiggerThanSchema should have failed on - Record has 2 fields but the schema size is 4");
} catch (BadRecordException e) {
- assertEquals(e.getMessage(), "Record has 2 fields but the schema size is 4");
+ assertEquals(e.getMessage(),
+ "Record has 2 fields but the schema size is 4");
}
}
@@ -151,16 +245,17 @@ public class BridgeOutputBuilderTest {
/* five fields instead of four */
List<OneField> complete = Arrays.asList(
- new OneField(DataType.INTEGER.getOID(), 10),
- new OneField(DataType.INTEGER.getOID(), 20),
- new OneField(DataType.INTEGER.getOID(), 30),
- new OneField(DataType.INTEGER.getOID(), 40),
- new OneField(DataType.INTEGER.getOID(), 50));
+ new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+ DataType.INTEGER.getOID(), 20), new OneField(
+ DataType.INTEGER.getOID(), 30), new OneField(
+ DataType.INTEGER.getOID(), 40), new OneField(
+ DataType.INTEGER.getOID(), 50));
try {
builder.fillGPDBWritable(complete);
fail("testRecordBiggerThanSchema should have failed on - Record has 5 fields but the schema size is 4");
} catch (BadRecordException e) {
- assertEquals(e.getMessage(), "Record has 5 fields but the schema size is 4");
+ assertEquals(e.getMessage(),
+ "Record has 5 fields but the schema size is 4");
}
}
@@ -179,25 +274,171 @@ public class BridgeOutputBuilderTest {
/* last field is REAL while schema requires INT */
List<OneField> complete = Arrays.asList(
- new OneField(DataType.INTEGER.getOID(), 10),
- new OneField(DataType.INTEGER.getOID(), 20),
- new OneField(DataType.INTEGER.getOID(), 30),
- new OneField(DataType.REAL.getOID(), 40.0));
+ new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+ DataType.INTEGER.getOID(), 20), new OneField(
+ DataType.INTEGER.getOID(), 30), new OneField(
+ DataType.REAL.getOID(), 40.0));
try {
builder.fillGPDBWritable(complete);
fail("testFieldTypeMismatch should have failed on - For field 3 schema requires type INTEGER but input record has type REAL");
} catch (BadRecordException e) {
- assertEquals(e.getMessage(), "For field col3 schema requires type INTEGER but input record has type REAL");
+ assertEquals(e.getMessage(),
+ "For field col3 schema requires type INTEGER but input record has type REAL");
+ }
+ }
+
+ @Test
+ public void convertTextDataToLines() throws Exception {
+
+ String data = "Que sara sara\n" + "Whatever will be will be\n"
+ + "We are going\n" + "to Wembeley!\n";
+ byte[] dataBytes = data.getBytes();
+ String[] dataLines = new String[] {
+ "Que sara sara\n",
+ "Whatever will be will be\n",
+ "We are going\n",
+ "to Wembeley!\n" };
+
+ OneField field = new OneField(DataType.BYTEA.getOID(), dataBytes);
+ List<OneField> fields = new ArrayList<OneField>();
+ fields.add(field);
+
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put("X-GP-ATTRS", "1");
+ addColumn(parameters, 0, DataType.TEXT, "col0");
+ // activate sampling code
+ parameters.put("X-GP-STATS-MAX-FRAGMENTS", "100");
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "1.00");
+
+ BridgeOutputBuilder builder = makeBuilder(parameters);
+ LinkedList<Writable> outputQueue = builder.makeOutput(fields);
+
+ assertEquals(4, outputQueue.size());
+
+ for (int i = 0; i < dataLines.length; ++i) {
+ Writable line = outputQueue.get(i);
+ compareBufferWritable(line, dataLines[i]);
}
- }
- private void addColumn(Map<String, String> parameters, int idx, DataType dataType, String name) {
+ assertNull(builder.getPartialLine());
+ }
+
+ @Test
+ public void convertTextDataToLinesPartial() throws Exception {
+ String data = "oh well\n" + "what the hell";
+
+ OneField field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+ List<OneField> fields = new ArrayList<OneField>();
+ fields.add(field);
+
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put("X-GP-ATTRS", "1");
+ addColumn(parameters, 0, DataType.TEXT, "col0");
+ // activate sampling code
+ parameters.put("X-GP-STATS-MAX-FRAGMENTS", "100");
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "1.00");
+
+ BridgeOutputBuilder builder = makeBuilder(parameters);
+ LinkedList<Writable> outputQueue = builder.makeOutput(fields);
+
+ assertEquals(1, outputQueue.size());
+
+ Writable line = outputQueue.get(0);
+ compareBufferWritable(line, "oh well\n");
+
+ Writable partial = builder.getPartialLine();
+ assertNotNull(partial);
+ compareBufferWritable(partial, "what the hell");
+
+ // check that append works
+ data = " but the show must go on\n" + "!!!\n";
+ field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+ fields.clear();
+ fields.add(field);
+
+ outputQueue = builder.makeOutput(fields);
+
+ assertNull(builder.getPartialLine());
+ assertEquals(2, outputQueue.size());
+
+ line = outputQueue.get(0);
+ compareBufferWritable(line, "what the hell but the show must go on\n");
+ line = outputQueue.get(1);
+ compareBufferWritable(line, "!!!\n");
+
+ // check that several partial lines gets appended to each other
+ data = "I want to ride my bicycle\n" + "I want to ride my bike";
+
+ field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+ fields.clear();
+ fields.add(field);
+
+ outputQueue = builder.makeOutput(fields);
+
+ assertEquals(1, outputQueue.size());
+
+ line = outputQueue.get(0);
+ compareBufferWritable(line, "I want to ride my bicycle\n");
+
+ partial = builder.getPartialLine();
+ assertNotNull(partial);
+ compareBufferWritable(partial, "I want to ride my bike");
+
+ // data consisting of one long line
+ data = " I want to ride my bicycle";
+
+ field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+ fields.clear();
+ fields.add(field);
+
+ outputQueue = builder.makeOutput(fields);
+
+ assertEquals(0, outputQueue.size());
+
+ partial = builder.getPartialLine();
+ assertNotNull(partial);
+ compareBufferWritable(partial,
+ "I want to ride my bike I want to ride my bicycle");
+
+ // data with lines
+ data = " bicycle BICYCLE\n" + "bicycle BICYCLE\n";
+
+ field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+ fields.clear();
+ fields.add(field);
+
+ outputQueue = builder.makeOutput(fields);
+
+ assertEquals(2, outputQueue.size());
+
+ line = outputQueue.get(0);
+ compareBufferWritable(line,
+ "I want to ride my bike I want to ride my bicycle bicycle BICYCLE\n");
+ line = outputQueue.get(1);
+ compareBufferWritable(line, "bicycle BICYCLE\n");
+
+ partial = builder.getPartialLine();
+ assertNull(partial);
+
+ }
+
+ private void compareBufferWritable(Writable line, String expected)
+ throws IOException {
+ assertTrue(line instanceof BufferWritable);
+ line.write(dos);
+ assertArrayEquals(expected.getBytes(), dos.getOutput());
+ }
+
+ private void addColumn(Map<String, String> parameters, int idx,
+ DataType dataType, String name) {
parameters.put("X-GP-ATTR-NAME" + idx, name);
- parameters.put("X-GP-ATTR-TYPECODE" + idx, Integer.toString(dataType.getOID()));
+ parameters.put("X-GP-ATTR-TYPECODE" + idx,
+ Integer.toString(dataType.getOID()));
parameters.put("X-GP-ATTR-TYPENAME" + idx, dataType.toString());
}
- private BridgeOutputBuilder makeBuilder(Map<String, String> parameters) throws Exception {
+ private BridgeOutputBuilder makeBuilder(Map<String, String> parameters)
+ throws Exception {
parameters.put("X-GP-ALIGNMENT", "8");
parameters.put("X-GP-SEGMENT-ID", "-44");
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
new file mode 100644
index 0000000..b5d4f9a
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
@@ -0,0 +1,225 @@
+package org.apache.hawq.pxf.service;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.ReadSamplingBridge;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ AnalyzeUtils.class, ReadSamplingBridge.class })
+public class ReadSamplingBridgeTest {
+
+ /**
+ * Writable test object to test ReadSamplingBridge. The object receives a
+ * string and returns it in its toString function.
+ */
+ public class WritableTest implements Writable {
+
+ private String data;
+
+ public WritableTest(String data) {
+ this.data = data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new IOException("not implemented");
+ }
+
+ @Override
+ public String toString() {
+ return data;
+ }
+
+ }
+
+ private ProtocolData mockProtData;
+ private ReadBridge mockBridge;
+ private ReadSamplingBridge readSamplingBridge;
+ private int recordsLimit = 0;
+ private BitSet samplingBitSet;
+ private Writable result;
+
+ @Test
+ public void getNextRecord100Percent() throws Exception {
+
+ samplingBitSet.set(0, 100);
+ recordsLimit = 100;
+ when(mockProtData.getStatsSampleRatio()).thenReturn((float) 1.0);
+
+ readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+ result = readSamplingBridge.getNext();
+ assertEquals("0", result.toString());
+
+ result = readSamplingBridge.getNext();
+ assertEquals("1", result.toString());
+
+ when(mockBridge.getNext()).thenReturn(null);
+
+ result = readSamplingBridge.getNext();
+ assertNull(result);
+ }
+
+ @Test
+ public void getNextRecord100Records10Percent() throws Exception {
+
+ // set 10 bits from 5 to 14.
+ samplingBitSet.set(5, 15);
+ recordsLimit = 100;
+ when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.1);
+
+ readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+ for (int i = 0; i < 10; i++) {
+ result = readSamplingBridge.getNext();
+ assertEquals("" + (i + 5), result.toString());
+ }
+
+ result = readSamplingBridge.getNext();
+ assertNull(result);
+ }
+
+ @Test
+ public void getNextRecord100Records90Percent() throws Exception {
+ int expected = 0;
+
+ // set the first odd numbers until 20, then all numbers until 100
+ // total: 90.
+ samplingBitSet.set(0, 100);
+ for (int i = 0; i < 10; i++) {
+ samplingBitSet.flip(i * 2);
+ }
+ recordsLimit = 100;
+ when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.9);
+
+ readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+ for (int i = 0; i < 90; i++) {
+ result = readSamplingBridge.getNext();
+ if (i < 10) {
+ expected = (i * 2) + 1;
+ } else {
+ expected = i + 10;
+ }
+ assertEquals("" + expected, result.toString());
+ }
+ result = readSamplingBridge.getNext();
+ assertNull(result);
+ }
+
+ @Test
+ public void getNextRecord350Records50Percent() throws Exception {
+
+ // set bits 0, 40-79 (40) and 90-98 (9)
+ // total 50.
+ samplingBitSet.set(0);
+ samplingBitSet.set(40, 80);
+ samplingBitSet.set(90, 99);
+ recordsLimit = 350;
+ when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.5);
+
+ readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+ /*
+ * expecting to have: 50 (out of first 100) 50 (out of second 100) 50
+ * (out of third 100) 11 (out of last 50) --- 161 records
+ */
+ for (int i = 0; i < 161; i++) {
+ result = readSamplingBridge.getNext();
+ assertNotNull(result);
+ if (i % 50 == 0) {
+ assertEquals("" + (i * 2), result.toString());
+ }
+ }
+ result = readSamplingBridge.getNext();
+ assertNull(result);
+ }
+
+ @Test
+ public void getNextRecord100000Records30Sample() throws Exception {
+ int expected = 0;
+
+ // ratio = 0.0003
+ float ratio = (float) (30.0 / 100000.0);
+
+ // set 3 records in every 10000.
+ samplingBitSet.set(99);
+ samplingBitSet.set(999);
+ samplingBitSet.set(9999);
+ recordsLimit = 100000;
+ when(mockProtData.getStatsSampleRatio()).thenReturn(ratio);
+
+ readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+ for (int i = 0; i < 30; i++) {
+ result = readSamplingBridge.getNext();
+ assertNotNull(result);
+ int residue = i % 3;
+ int div = i / 3;
+ if (residue == 0) {
+ expected = 99 + (div * 10000);
+ } else if (residue == 1) {
+ expected = 999 + (div * 10000);
+ } else {
+ expected = 9999 + (div * 10000);
+ }
+ assertEquals("" + expected, result.toString());
+ }
+ result = readSamplingBridge.getNext();
+ assertNull(result);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ mockProtData = mock(ProtocolData.class);
+
+ mockBridge = mock(ReadBridge.class);
+ PowerMockito.whenNew(ReadBridge.class).withAnyArguments().thenReturn(
+ mockBridge);
+
+ when(mockBridge.getNext()).thenAnswer(new Answer<Writable>() {
+ private int count = 0;
+
+ @Override
+ public Writable answer(InvocationOnMock invocation)
+ throws Throwable {
+ if (count >= recordsLimit) {
+ return null;
+ }
+ return new WritableTest("" + (count++));
+ }
+ });
+
+ PowerMockito.mockStatic(AnalyzeUtils.class);
+ samplingBitSet = new BitSet();
+ when(
+ AnalyzeUtils.generateSamplingBitSet(any(int.class),
+ any(int.class))).thenReturn(samplingBitSet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
new file mode 100644
index 0000000..984fcf6
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
@@ -0,0 +1,22 @@
+package org.apache.hawq.pxf.service.io;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class BufferWritableTest {
+
+ @Test
+ public void append() throws Exception {
+ String data1 = "פרק ראשון ובו יסופר יסופר";
+ String data2 = "פרק שני ובו יסופר יסופר";
+
+ BufferWritable bw1 = new BufferWritable(data1.getBytes());
+
+ assertArrayEquals(data1.getBytes(), bw1.buf);
+
+ bw1.append(data2.getBytes());
+
+ assertArrayEquals((data1+data2).getBytes(), bw1.buf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
new file mode 100644
index 0000000..39bb266
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
@@ -0,0 +1,117 @@
+package org.apache.hawq.pxf.service.utilities;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+public class AnalyzeUtilsTest {
+
+ @Test
+ public void generateSamplingBitSet() throws Exception {
+ runGenerateSamplingBitSetTest(10, 5, new int[]{0, 3, 4, 6, 9});
+
+ runGenerateSamplingBitSetTest(9, 8, new int[] {0, 2, 3, 4, 5, 6, 7, 8});
+
+ runGenerateSamplingBitSetTest(10, 10, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+ runGenerateSamplingBitSetTest(8, 0, new int[]{});
+
+ runGenerateSamplingBitSetTest(8, 3, new int[]{0, 3, 6});
+ }
+
+ @Test
+ public void generateSamplingBitSetBig() throws Exception {
+ BitSet result = AnalyzeUtils.generateSamplingBitSet(1000000, 990000);
+ assertEquals(result.cardinality(), 990000);
+ assertTrue(result.length() < 1000000);
+
+ result = AnalyzeUtils.generateSamplingBitSet(1000000000, 5000000);
+ assertEquals(result.cardinality(), 5000000);
+ assertTrue(result.length() < 1000000000);
+ }
+
+ @Test
+ public void getSampleFragments() throws Exception {
+ // fragments less than threshold
+ runGetSampleFragmentsTest(4, 100, 4, new int[] {0, 1, 2, 3});
+
+ // fragments over threshold
+ runGetSampleFragmentsTest(4, 2, 2, new int[]{0, 3});
+ runGetSampleFragmentsTest(10, 2, 2, new int[]{0, 6});
+ runGetSampleFragmentsTest(10, 3, 3, new int[]{0, 4, 8});
+ runGetSampleFragmentsTest(10, 9, 9, new int[]{0, 1, 2, 4, 5, 6, 7, 8, 9 });
+ runGetSampleFragmentsTest(15, 10, 10, new int[]{0, 2, 3, 4, 6, 7, 8, 10, 12, 14});
+ runGetSampleFragmentsTest(1000, 10, 10,
+ new int[]{0, 101, 202, 303, 404, 505, 606, 707, 808, 909});
+ runGetSampleFragmentsTest(100, 65, 65,
+ new int[]{0, 1, 2, 4, 5, 6, 8, 9, 10, /* 9 elements */
+ 12, 13, 14, 16, 17, 18, /* 6 elements */
+ 20, 21, 22, 24, 25, 26, 28, 29, /* 8 elements */
+ 30, 32, 33, 34, 36, 37, 38, /* 7 elements */
+ 40, 41, 42, 44, 45, 46, 48, 49, /* 8 elements */
+ 50, 52, 53, 54, 56, 57, 58, /* 7 elements */
+ 60, 62, 64, 66, 68, /* 5 elements */
+ 70, 72, 74, 76, 78, /* 5 elements */
+ 80, 82, 84, 86, 88, /* 5 elements */
+ 90, 92, 94, 96, 98 /* 5 elements */
+ });
+ /* => 65 elements */
+ // threshold illegal and ignored
+ runGetSampleFragmentsTest(10, 0, 10, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+ }
+
+ private void runGenerateSamplingBitSetTest(int poolSize, int sampleSize, int[] expectedIndexes) throws Exception {
+ BitSet expected = new BitSet();
+ for (int i: expectedIndexes) {
+ expected.set(i);
+ }
+ BitSet result = AnalyzeUtils.generateSamplingBitSet(poolSize, sampleSize);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ private void runGetSampleFragmentsTest(int inputSize, int maxFragments, int expectedSize, int[] expectedIndexes) throws Exception {
+ ProtocolData mockProtocolData = mock(ProtocolData.class);
+ when(mockProtocolData.getStatsMaxFragments()).thenReturn(maxFragments);
+
+ List<Fragment> fragments = new ArrayList<Fragment>();
+
+ for (int i = 0; i < inputSize; i++) {
+ fragments.add(prepareFragment(i));
+ }
+ assertEquals(inputSize, fragments.size());
+
+ List<Fragment> result = AnalyzeUtils.getSampleFragments(fragments, mockProtocolData);
+
+ List<Fragment> expected = new ArrayList<Fragment>();
+
+ for (int i: expectedIndexes) {
+ expected.add(prepareFragment(i));
+ }
+
+ assertEquals("verify number of returned fragments", expectedSize, result.size());
+
+ for (int i = 0; i < expectedSize; i++) {
+ Assert.assertEquals("compare fragment #" + i, expected.get(i).getIndex(), result.get(i).getIndex());
+ }
+ }
+
+ private Fragment prepareFragment(int i) {
+ Fragment fragment = new Fragment("fragment" + i, null, null);
+ fragment.setIndex(i);
+ return fragment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
index 194f148..bd31b19 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({UserGroupInformation.class, ProfilesConf.class})
+@PrepareForTest({ UserGroupInformation.class, ProfilesConf.class })
public class ProtocolDataTest {
Map<String, String> parameters;
@@ -47,7 +47,8 @@ public class ProtocolDataTest {
assertEquals(protocolData.getAccessor(), "are");
assertEquals(protocolData.getResolver(), "packed");
assertEquals(protocolData.getDataSource(), "i'm/ready/to/go");
- assertEquals(protocolData.getUserProperty("i'm-standing-here"), "outside-your-door");
+ assertEquals(protocolData.getUserProperty("i'm-standing-here"),
+ "outside-your-door");
assertEquals(protocolData.getParametersMap(), parameters);
assertNull(protocolData.getLogin());
assertNull(protocolData.getSecret());
@@ -66,10 +67,12 @@ public class ProtocolDataTest {
Map<String, String> mockedProfiles = new HashMap<>();
mockedProfiles.put("wHEn you trY yOUR bESt", "but you dont succeed");
- mockedProfiles.put("when YOU get WHAT you WANT", "but not what you need");
+ mockedProfiles.put("when YOU get WHAT you WANT",
+ "but not what you need");
mockedProfiles.put("when you feel so tired", "but you cant sleep");
- when(ProfilesConf.getProfilePluginsMap("a profile")).thenReturn(mockedProfiles);
+ when(ProfilesConf.getProfilePluginsMap("a profile")).thenReturn(
+ mockedProfiles);
parameters.put("x-gp-profile", "a profile");
parameters.put("when you try your best", "and you do succeed");
@@ -79,7 +82,8 @@ public class ProtocolDataTest {
new ProtocolData(parameters);
fail("Duplicate property should throw IllegalArgumentException");
} catch (IllegalArgumentException iae) {
- assertEquals("Profile 'a profile' already defines: [when YOU get WHAT you WANT, wHEn you trY yOUR bESt]",
+ assertEquals(
+ "Profile 'a profile' already defines: [when YOU get WHAT you WANT, wHEn you trY yOUR bESt]",
iae.getMessage());
}
}
@@ -169,8 +173,9 @@ public class ProtocolDataTest {
new ProtocolData(parameters);
fail("should fail with bad fragment metadata");
} catch (Exception e) {
- assertEquals(e.getMessage(), "Fragment metadata information must be Base64 encoded." +
- "(Bad value: " + badValue + ")");
+ assertEquals(e.getMessage(),
+ "Fragment metadata information must be Base64 encoded."
+ + "(Bad value: " + badValue + ")");
}
}
@@ -197,6 +202,116 @@ public class ProtocolDataTest {
assertEquals("UTF8_計算機用語_00000000", protocolData.getFilterString());
}
+ @Test
+ public void noStatsParams() {
+ ProtocolData protData = new ProtocolData(parameters);
+
+ assertEquals(0, protData.getStatsMaxFragments());
+ assertEquals(0, protData.getStatsSampleRatio(), 0.1);
+ }
+
+ @Test
+ public void statsParams() {
+ parameters.put("X-GP-STATS-MAX-FRAGMENTS", "10101");
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "0.039");
+
+ ProtocolData protData = new ProtocolData(parameters);
+
+ assertEquals(10101, protData.getStatsMaxFragments());
+ assertEquals(0.039, protData.getStatsSampleRatio(), 0.01);
+ }
+
+ @Test
+ public void statsMissingParams() {
+ parameters.put("X-GP-STATS-MAX-FRAGMENTS", "13");
+ try {
+ new ProtocolData(parameters);
+ fail("missing X-GP-STATS-SAMPLE-RATIO parameter");
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ e.getMessage(),
+ "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+ }
+
+ parameters.remove("X-GP-STATS-MAX-FRAGMENTS");
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "1");
+ try {
+ new ProtocolData(parameters);
+ fail("missing X-GP-STATS-MAX-FRAGMENTS parameter");
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ e.getMessage(),
+ "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+ }
+ }
+
+ @Test
+ public void statsSampleRatioNegative() {
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "101");
+
+ try {
+ new ProtocolData(parameters);
+ fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ e.getMessage(),
+ "Wrong value '101.0'. "
+ + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+ }
+
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "0");
+ try {
+ new ProtocolData(parameters);
+ fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ e.getMessage(),
+ "Wrong value '0.0'. "
+ + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+ }
+
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "0.00005");
+ try {
+ new ProtocolData(parameters);
+ fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ e.getMessage(),
+ "Wrong value '5.0E-5'. "
+ + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+ }
+
+ parameters.put("X-GP-STATS-SAMPLE-RATIO", "a");
+ try {
+ new ProtocolData(parameters);
+ fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+ } catch (NumberFormatException e) {
+ assertEquals(e.getMessage(), "For input string: \"a\"");
+ }
+ }
+
+ @Test
+ public void statsMaxFragmentsNegative() {
+ parameters.put("X-GP-STATS-MAX-FRAGMENTS", "10.101");
+
+ try {
+ new ProtocolData(parameters);
+ fail("wrong X-GP-STATS-MAX-FRAGMENTS value");
+ } catch (NumberFormatException e) {
+ assertEquals(e.getMessage(), "For input string: \"10.101\"");
+ }
+
+ parameters.put("X-GP-STATS-MAX-FRAGMENTS", "0");
+
+ try {
+ new ProtocolData(parameters);
+ fail("wrong X-GP-STATS-MAX-FRAGMENTS value");
+ } catch (IllegalArgumentException e) {
+ assertEquals(e.getMessage(), "Wrong value '0'. "
+ + "STATS-MAX-FRAGMENTS must be a positive integer");
+ }
+ }
+
/*
* setUp function called before each test
*/
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/Makefile b/src/backend/access/external/Makefile
index afc3734..fec5a34 100644
--- a/src/backend/access/external/Makefile
+++ b/src/backend/access/external/Makefile
@@ -10,7 +10,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = fileam.o url.o libchurl.o hd_work_mgr.o pxfuriparser.o pxfheaders.o \
-pxfmasterapi.o ha_config.o pxfcomutils.o pxfutils.o pxffilters.o
+pxfmasterapi.o ha_config.o pxfcomutils.o pxfutils.o pxffilters.o pxfanalyze.o
include $(top_srcdir)/src/backend/common.mk
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index 36f11a8..6aa0736 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -233,51 +233,41 @@ static void assign_pxf_port_to_fragments(int remote_rest_port, List *fragments)
}
/*
- * Fetches statistics of the PXF datasource from the PXF service
+ * Fetches fragments statistics of the PXF datasource from the PXF service
*
- * The function will generate a delegation token when secure filesystem mode
+ * The function will generate a delegation token when secure filesystem mode
* is on and cancel it right after.
*/
-PxfStatsElem *get_pxf_statistics(char *uri, Relation rel, StringInfo err_msg)
+PxfFragmentStatsElem *get_pxf_fragments_statistics(char *uri, Relation rel)
{
ClientContext client_context; /* holds the communication info */
char *analyzer = NULL;
char *profile = NULL;
PxfInputData inputData = {0};
- PxfStatsElem *result = NULL;
-
+ PxfFragmentStatsElem *result = NULL;
+
GPHDUri* hadoop_uri = init(uri, &client_context);
if (!hadoop_uri)
- return NULL;
-
- /*
- * Get the statistics info from REST only if analyzer is defined
- */
- if(GPHDUri_get_value_for_opt(hadoop_uri, "analyzer", &analyzer, false) != 0 &&
- GPHDUri_get_value_for_opt(hadoop_uri, "profile", &profile, false) != 0)
{
- if (err_msg)
- appendStringInfo(err_msg, "no ANALYZER or PROFILE option in table definition");
- return NULL;
+ elog(ERROR, "Failed to parse PXF location %s", uri);
}
-
+
/*
* Enrich the curl HTTP header
*/
inputData.headers = client_context.http_headers;
inputData.gphduri = hadoop_uri;
- inputData.rel = rel;
- inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */
+ inputData.rel = rel;
+ inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */
generate_delegation_token(&inputData);
build_http_header(&inputData);
-
- result = get_data_statistics(hadoop_uri, &client_context, err_msg);
+
+ result = get_fragments_statistics(hadoop_uri, &client_context);
cancel_delegation_token(&inputData);
return result;
}
-
/*
* Preliminary uri parsing and curl initializations for the REST communication
*/
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/pxfanalyze.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfanalyze.c b/src/backend/access/external/pxfanalyze.c
new file mode 100644
index 0000000..eb5a17a
--- /dev/null
+++ b/src/backend/access/external/pxfanalyze.c
@@ -0,0 +1,740 @@
+/*-------------------------------------------------------------------------
+ *
+ * pxfanalyze.c
+ * Helper functions to perform ANALYZE on PXF tables.
+ *
+ * Copyright (c) 2007-2012, Greenplum inc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include <curl/curl.h>
+#include <json/json.h>
+#include "access/hd_work_mgr.h"
+#include "access/pxfanalyze.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_exttable.h"
+#include "cdb/cdbanalyze.h"
+#include "commands/analyzeutils.h"
+#include "lib/stringinfo.h"
+#include "nodes/makefuncs.h"
+#include "utils/builtins.h"
+#include "utils/elog.h"
+#include "utils/guc.h"
+#include "utils/lsyscache.h"
+
+
+static void buildPxfTableCopy(Oid relationOid,
+ float4 samplingRatio,
+ int pxfStatMaxFragments,
+ const char* schemaName, const char* tableName,
+ const char* sampleSchemaName, const char* pxfSampleTable);
+static void buildSampleFromPxf(const char* sampleSchemaName,
+ const char* sampleTableName,
+ const char* pxfSampleTable,
+ List *lAttributeNames,
+ float4 *sampleTableRelTuples);
+
+static float4 calculateSamplingRatio(float4 relTuples,
+ float4 relFrags,
+ float4 requestedSampleSize);
+
+static char* parseFormat(char fmtcode);
+static char* escape_unprintables(const char *src);
+static char* escape_fmtopts_string(const char *src);
+static char* custom_fmtopts_string(const char *src);
+static void printExtTable(Oid relationOid, ExtTableEntry* extTable);
+static char* createPxfSampleStmt(Oid relationOid,
+ const char* schemaName, const char* tableName,
+ const char* sampleSchemaName, const char* pxfSampleTable,
+ float4 pxf_sample_ratio, int pxf_max_fragments);
+static float4 getPxfFragmentTupleCount(Oid relationOid);
+static float4 countFirstFragmentTuples(const char* schemaName,
+ const char* tableName);
+static void getFragmentStats(Relation rel, StringInfo location,
+ float4 *numfrags, float4 *firstfragsize,
+ float4 *totalsize);
+
+
+void analyzePxfEstimateReltuplesRelpages(Relation relation,
+ StringInfo location,
+ float4* estimatedRelTuples,
+ float4* estimatedRelPages)
+{
+
+ float4 numFrags = 0.0;
+ float4 firstFragSize = 0.0;
+ float4 totalSize = 0.0;
+
+ float4 firstFragTuples = 0.0;
+ float4 estimatedTuples = 0.0;
+
+ /* get number of fragments, size of first fragment and total size.
+ * This is used together with the number of tuples in first fragment
+ * to estimate the number of tuples in the table. */
+ getFragmentStats(relation, location, &numFrags, &firstFragSize, &totalSize);
+
+ /* get number of tuples from first fragment */
+ firstFragTuples = getPxfFragmentTupleCount(relation->rd_id);
+
+ /* calculate estimated tuple count */
+ if (firstFragTuples > 0)
+ {
+ Assert(firstFragSize > 0);
+ Assert(totalSize > 0);
+ /* The calculation:
+ * size of each tuple = first fragment size / first fragment row
+ * total size = size of each tuple * number of tuples
+ * number of tuples = total size / size of each tuple
+ */
+ estimatedTuples = (totalSize / firstFragSize) * firstFragTuples;
+ }
+
+ elog(DEBUG2, "Estimated tuples for PXF table: %f. (first fragment count %f, fragments number %f, old estimate %f)",
+ estimatedTuples, firstFragTuples, numFrags, *estimatedRelTuples);
+
+ *estimatedRelTuples = estimatedTuples;
+ *estimatedRelPages = numFrags;
+
+ /* relpages can't be 0 if there are tuples in the table. */
+ if ((*estimatedRelPages < 1.0) && (estimatedTuples > 0))
+ {
+ *estimatedRelPages = 1.0;
+ }
+
+ /* in case there were problems with the PXF service, keep the defaults */
+ if (*estimatedRelPages < 0)
+ {
+ *estimatedRelPages = gp_external_table_default_number_of_pages;
+ }
+ if (*estimatedRelTuples < 0)
+ {
+ *estimatedRelTuples = gp_external_table_default_number_of_tuples;
+ }
+}
+
+/*
+ * Creates a sample table with data from a PXF table.
+ * We need to create a copy of the PXF table, in order to pass the sampling
+ * parameters pxf_sample_ratio and pxf_max_fragments as attributes,
+ * and to create a segment reject limit of 25 percent.
+ *
+ * The new PXF table is sampled and the results are saved in the returned sample table.
+ * Note that ANALYZE can be executed only by the database owner.
+ * It is safe to assume that the database owner has permissions to create temp tables.
+ * The sampling is done by uniformly sampling pxf_sample_ratio records of each fragments,
+ * up to pxf_max_fragments.
+ *
+ * Input:
+ * relationOid - relation to be sampled
+ * sampleTableName - sample table name, moderately unique
+ * lAttributeNames - attributes to be included in the sample
+ * relTuples - estimated size of relation
+ * relFrags - estimated number of fragments in relation
+ * requestedSampleSize - as determined by attribute statistics requirements.
+ * sampleTableRelTuples - limit on size of the sample.
+ * Output:
+ * sampleTableRelTuples - number of tuples in the sample table created.
+ */
+Oid buildPxfSampleTable(Oid relationOid,
+ char* sampleTableName,
+ List *lAttributeNames,
+ float4 relTuples,
+ float4 relFrags,
+ float4 requestedSampleSize,
+ float4 *sampleTableRelTuples)
+{
+ const char *schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+ const char *tableName = get_rel_name(relationOid); /* must be pfreed */
+ char *sampleSchemaName = pstrdup("pg_temp");
+ char *pxfSampleTable = temporarySampleTableName(relationOid, "pg_analyze_pxf"); /* must be pfreed */
+ Oid sampleTableOid = InvalidOid;
+ Oid pxfSampleTableOid = InvalidOid;
+ RangeVar *rangeVar = NULL;
+ float4 pxfSamplingRatio = 0.0;
+
+ Assert(requestedSampleSize > 0.0);
+ Assert(relTuples > 0.0);
+ Assert(relFrags > 0.0);
+
+ /* calculate pxf_sample_ratio */
+ pxfSamplingRatio = calculateSamplingRatio(relTuples, relFrags, requestedSampleSize);
+
+ /* build copy of original pxf table */
+ buildPxfTableCopy(relationOid,
+ pxfSamplingRatio,
+ pxf_stat_max_fragments,
+ schemaName, tableName,
+ sampleSchemaName, pxfSampleTable);
+
+ rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, pxfSampleTable, -1);
+ pxfSampleTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+ buildSampleFromPxf(sampleSchemaName, sampleTableName, pxfSampleTable,
+ lAttributeNames, sampleTableRelTuples);
+
+ rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, sampleTableName, -1);
+ sampleTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+ Assert(sampleTableOid != InvalidOid);
+
+ /**
+ * MPP-10723: Very rarely, we may be unlucky and generate an empty sample table. We error out in this case rather than
+ * generate bad statistics.
+ */
+
+ if (*sampleTableRelTuples < 1.0)
+ {
+ elog(ERROR, "ANALYZE unable to generate accurate statistics on table %s.%s. Try lowering gp_analyze_relative_error",
+ quote_identifier(schemaName),
+ quote_identifier(tableName));
+ }
+
+ if (pxfSampleTableOid != InvalidOid)
+ {
+ elog(DEBUG2, "ANALYZE dropping PXF sample table");
+ dropSampleTable(pxfSampleTableOid, true);
+ }
+
+ pfree((void *) rangeVar);
+ pfree((void *) pxfSampleTable);
+ pfree((void *) tableName);
+ pfree((void *) schemaName);
+ pfree((void *) sampleSchemaName);
+ return sampleTableOid;
+}
+
+/*
+ * Creates an external PXF table, with the same properties
+ * as the given PXF table to be sampled, other than additional
+ * 2 attributes in the location clause -
+ * pxf_stats_sample_ratio and pxf_stats_max_fragments,
+ * and a segment reject limit of 25 percent.
+ */
+static void buildPxfTableCopy(Oid relationOid,
+ float4 samplingRatio,
+ int pxfStatMaxFragments,
+ const char* schemaName, const char* tableName,
+ const char* sampleSchemaName, const char* pxfSampleTable)
+{
+
+ /* create table string */
+ char* createPxfSampleStr = createPxfSampleStmt(relationOid,
+ schemaName, tableName,
+ sampleSchemaName, pxfSampleTable,
+ samplingRatio, pxfStatMaxFragments);
+
+ spiExecuteWithCallback(createPxfSampleStr, false /*readonly*/, 0 /*tcount */,
+ NULL, NULL);
+
+ pfree(createPxfSampleStr);
+
+ elog(DEBUG2, "Created PXF table %s.%s for sampling PXF table %s.%s",
+ quote_identifier(sampleSchemaName),
+ quote_identifier(pxfSampleTable),
+ quote_identifier(schemaName),
+ quote_identifier(tableName));
+}
+
+/*
+ * Creates and populates a sample table for a PXF table.
+ * The actual queried table is not the original PXF table but a copy of it
+ * with additional attributes to enable sampling.
+ *
+ * The results are stored in sampleTableRelTuples.
+ */
+static void buildSampleFromPxf(const char* sampleSchemaName,
+ const char* sampleTableName,
+ const char* pxfSampleTable,
+ List *lAttributeNames,
+ float4 *sampleTableRelTuples)
+{
+ int nAttributes = -1;
+ int i = 0;
+ ListCell *le = NULL;
+ StringInfoData str;
+
+ initStringInfo(&str);
+
+ appendStringInfo(&str, "create table %s.%s as (select ",
+ quote_identifier(sampleSchemaName), quote_identifier(sampleTableName));
+
+ nAttributes = list_length(lAttributeNames);
+
+ foreach_with_count(le, lAttributeNames, i)
+ {
+ appendStringInfo(&str, "Ta.%s", quote_identifier((const char *) lfirst(le)));
+ if (i < nAttributes - 1)
+ {
+ appendStringInfo(&str, ", ");
+ }
+ else
+ {
+ appendStringInfo(&str, " ");
+ }
+ }
+
+ appendStringInfo(&str, "from %s.%s as Ta) distributed randomly",
+ quote_identifier(sampleSchemaName),
+ quote_identifier(pxfSampleTable));
+
+ /* in case of PXF error, analyze on this table will reverted */
+ spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+ spiCallback_getProcessedAsFloat4, sampleTableRelTuples);
+
+ pfree(str.data);
+
+ elog(DEBUG2, "Created sample table %s.%s with nrows=%.0f",
+ quote_identifier(sampleSchemaName),
+ quote_identifier(sampleTableName),
+ *sampleTableRelTuples);
+}
+
+/*
+ * Returns a sampling ratio - a fraction between 1.0 and 0.0001
+ * representing how many samples should be returned from each fragment
+ * of a PXF table.
+ * The ratio is calculated based on the tuples estimate of the table
+ * and on the number of the actually sampled fragments
+ * (GUC pxf_stat_max_fragments), by the following formula:
+ * ratio = (<sample size> / <tuples estimate>) * (<total # fragments> / <fragments to be sampled>)
+ * If the ratio is too big or small, it is corrected to 1.0 or 0.0001 respectively.
+ *
+ * Input:
+ * relTuples - number of tuples in the table
+ * relFrags - number of fragments in the table
+ * requestedSampleSize - number of sample tuples required
+ * Output:
+ * the sampling ratio for the table.
+ */
+static float4 calculateSamplingRatio(float4 relTuples,
+ float4 relFrags,
+ float4 requestedSampleSize)
+{
+ float4 sampleRatio = 0.0;
+
+ Assert(relFrags > 0);
+ Assert(relTuples > 0);
+ Assert(requestedSampleSize > 0);
+
+ /* sample ratio for regular tables */
+ sampleRatio = requestedSampleSize / relTuples;
+
+ if (pxf_stat_max_fragments < relFrags)
+ {
+ /*
+ * Correct ratio according to the number of sampled fragments.
+ * If there are less fragments to sample, the ratio should be increased.
+ * If the corrected sampling ratio is > 100%, make it 100%
+ */
+ sampleRatio = sampleRatio * (relFrags / pxf_stat_max_fragments);
+ if (sampleRatio > 1.0)
+ {
+ sampleRatio = 1.0;
+ }
+ }
+
+ /*
+ * If the ratio is too low (< 0.0001), correct it to 0.0001.
+ * That means that the lowest rate we will get is 1 tuple per 10,000.
+ */
+ if (sampleRatio < 0.0001)
+ {
+ sampleRatio = 0.0001;
+ }
+
+ elog(DEBUG2, "PXF ANALYZE: pxf_stats_sample_ratio = %f, pxf_stats_max_fragments = %d, table fragments = %f",
+ sampleRatio, pxf_stat_max_fragments, relFrags);
+ return sampleRatio;
+}
+
+static char* parseFormat(char fmtcode)
+{
+ if (fmttype_is_custom(fmtcode))
+ return "CUSTOM";
+ if (fmttype_is_text(fmtcode))
+ return "TEXT";
+ if (fmttype_is_csv(fmtcode))
+ return "CSV";
+
+ elog(ERROR, "Unrecognized external table format '%c'", fmtcode);
+ return NULL;
+}
+
+/* Helper functions from dumputils.c, modified to backend (malloc->palloc) */
+
+/*
+ * Escape any unprintables (0x00 - 0x1F) in given string
+ */
+char *
+escape_unprintables(const char *src)
+{
+ int len = strlen(src),
+ i,
+ j;
+ char *result = palloc0(len * 4 + 1);
+ if (!result)
+ return NULL; /* out of memory */
+
+ for (i = 0, j = 0; i < len; i++)
+ {
+ if ((src[i] <= '\x1F') && (src[i] != '\x09' /* TAB */))
+ {
+ snprintf(&(result[j]), 5, "\\x%02X", src[i]);
+ j += 4;
+ }
+ else
+ result[j++] = src[i];
+ }
+ result[j] = '\0';
+ return result;
+}
+
+/*
+ * Escape backslashes and apostrophes in EXTERNAL TABLE format strings.
+ *
+ * The fmtopts field of a pg_exttable tuple has an odd encoding -- it is
+ * partially parsed and contains "string" values that aren't legal SQL.
+ * Each string value is delimited by apostrophes and is usually, but not
+ * always, a single character. The fmtopts field is typically something
+ * like {delimiter '\x09' null '\N' escape '\'} or
+ * {delimiter ',' null '' escape '\' quote '''}. Each backslash and
+ * apostrophe in a string must be escaped and each string must be
+ * prepended with an 'E' denoting an "escape syntax" string.
+ *
+ * Usage note: A field value containing an apostrophe followed by a space
+ * will throw this algorithm off -- it presumes no embedded spaces.
+ */
+static char* escape_fmtopts_string(const char *src)
+{
+ int len = strlen(src);
+ int i;
+ int j;
+ char *result = palloc0(len * 2 + 1);
+ bool inString = false;
+
+ for (i = 0, j = 0; i < len; i++)
+ {
+ switch (src[i])
+ {
+ case '\'':
+ if (inString)
+ {
+ /*
+ * Escape apostrophes *within* the string. If the
+ * apostrophe is at the end of the source string or is
+ * followed by a space, it is presumed to be a closing
+ * apostrophe and is not escaped.
+ */
+ if ((i + 1) == len || src[i + 1] == ' ')
+ inString = false;
+ else
+ result[j++] = '\\';
+ }
+ else
+ {
+ result[j++] = 'E';
+ inString = true;
+ }
+ break;
+ case '\\':
+ result[j++] = '\\';
+ break;
+ }
+
+ result[j++] = src[i];
+ }
+
+ result[j] = '\0';
+ return result;
+}
+
+/*
+ * Tokenize a fmtopts string (for use with 'custom' formatters)
+ * i.e. convert it to: a = b, format.
+ * (e.g.: formatter E'fixedwidth_in null E' ' preserve_blanks E'on')
+ */
+static char* custom_fmtopts_string(const char *src)
+{
+ int len = src ? strlen(src) : 0;
+ char *result = palloc0(len * 2 + 1);
+ char *srcdup = src ? pstrdup(src) : NULL;
+ char *srcdup_start = srcdup;
+ char *find_res = NULL;
+ int last = 0;
+
+ if(!src || !srcdup || !result)
+ return NULL;
+
+ while (srcdup)
+ {
+ /* find first word (a) */
+ find_res = strchr(srcdup, ' ');
+ if (!find_res)
+ break;
+ strncat(result, srcdup, (find_res - srcdup));
+ /* skip space */
+ srcdup = find_res + 1;
+ /* remove E if E' */
+ if((strlen(srcdup) > 2) && (srcdup[0] == 'E') && (srcdup[1] == '\''))
+ srcdup++;
+ /* add " = " */
+ strncat(result, " = ", 3);
+ /* find second word (b) until second '
+ find \' combinations and ignore them */
+ find_res = strchr(srcdup + 1, '\'');
+ while (find_res && (*(find_res - 1) == '\\') /* ignore \' */)
+ {
+ find_res = strchr(find_res + 1, '\'');
+ }
+ if (!find_res)
+ break;
+ strncat(result, srcdup, (find_res - srcdup + 1));
+ srcdup = find_res + 1;
+ /* skip space and add ',' */
+ if (srcdup && srcdup[0] == ' ')
+ {
+ srcdup++;
+ strncat(result, ",", 1);
+ }
+ }
+
+ /* fix string - remove trailing ',' or '=' */
+ last = strlen(result)-1;
+ if(result[last] == ',' || result[last] == '=')
+ result[last]='\0';
+
+ pfree(srcdup_start);
+ return result;
+}
+
+static void printExtTable(Oid relationOid, ExtTableEntry* extTable)
+{
+
+ if (extTable == NULL)
+ return;
+
+ elog(DEBUG2, "extTable params: oid: %d command: %s, encoding: %d, "
+ "format: %c (%s), error table oid: %d, format options: %s, "
+ "is web: %d, is writable: %d, locations size: %d, "
+ "reject limit: %d, reject limit type: %c",
+ relationOid,
+ extTable->command ? extTable->command : "NULL",
+ extTable->encoding,
+ extTable->fmtcode,
+ parseFormat(extTable->fmtcode),
+ extTable->fmterrtbl,
+ extTable->fmtopts,
+ extTable->isweb,
+ extTable->iswritable,
+ list_length(extTable->locations),
+ extTable->rejectlimit,
+ extTable->rejectlimittype == -1 ? 'n' : extTable->rejectlimittype);
+}
+
+/*
+ * This method returns an SQL command to create a PXF table
+ * which is a copy of a given PXF table relationOid, with the following changes:
+ * - PXF sample table name is pg_temp.pg_analyze_pxf_<relationOid>
+ * - LOCATION part is appended 2 attributes - pxf_sample_ratio, pxf_max_fragments.
+ * - in case of error table - SEGMENT REJECT LIMIT 25 PERCENT
+ *
+ * Input:
+ * relationOid - relation to be copied
+ * schemaName - schema name of original table
+ * tableName - table name of original table
+ * sampleSchemaName - schema name of new table
+ * pxfSampleTable - table name or new table
+ * pxf_sample_ratio - ratio of samplings to be done per fragment
+ * pxf_max_fragments - max number of fragments to be sampled
+ * Output:
+ * SQL statement string for creating the new table
+ */
+static char* createPxfSampleStmt(Oid relationOid,
+ const char* schemaName, const char* tableName,
+ const char* sampleSchemaName, const char* pxfSampleTable,
+ float4 pxf_sample_ratio, int pxf_max_fragments)
+{
+ ExtTableEntry *extTable = GetExtTableEntry(relationOid);
+ StringInfoData str;
+ initStringInfo(&str);
+ char* location = NULL;
+ char* tmpstring = NULL;
+ char* escapedfmt = NULL;
+ char* tabfmt = NULL;
+ char* customfmt = NULL;
+
+ printExtTable(relationOid, extTable);
+
+ location = escape_unprintables(((Value*)list_nth(extTable->locations, 0))->val.str /*pxfLocation*/);
+
+ appendStringInfo(&str, "CREATE EXTERNAL TABLE %s.%s (LIKE %s.%s) "
+ "LOCATION(E'%s&STATS-SAMPLE-RATIO=%.4f&STATS-MAX-FRAGMENTS=%d') ",
+ quote_identifier(sampleSchemaName),
+ quote_identifier(pxfSampleTable),
+ quote_identifier(schemaName),
+ quote_identifier(tableName),
+ location,
+ pxf_sample_ratio,
+ pxf_max_fragments);
+
+ pfree(location);
+
+ /* add FORMAT clause */
+ escapedfmt = escape_fmtopts_string((const char *) extTable->fmtopts);
+ tmpstring = escape_unprintables((const char *) escapedfmt);
+ pfree(escapedfmt);
+ escapedfmt = NULL;
+
+ switch (extTable->fmtcode)
+ {
+ case 't':
+ tabfmt = "text";
+ break;
+ case 'b':
+ /*
+ * b denotes that a custom format is used.
+ * the fmtopts string should be formatted as:
+ * a1 = 'val1',...,an = 'valn'
+ *
+ */
+ tabfmt = "custom";
+ customfmt = custom_fmtopts_string(tmpstring);
+ break;
+ default:
+ tabfmt = "csv";
+ }
+ appendStringInfo(&str, "FORMAT '%s' (%s) ",
+ tabfmt,
+ customfmt ? customfmt : tmpstring);
+ pfree(tmpstring);
+ tmpstring = NULL;
+ if (customfmt)
+ {
+ pfree(customfmt);
+ customfmt = NULL;
+ }
+ /* add ENCODING clause */
+ appendStringInfo(&str, "ENCODING '%s' ", pg_encoding_to_char(extTable->encoding));
+
+ /* add error control clause */
+ if (extTable->rejectlimit != -1)
+ {
+ appendStringInfo(&str, "%s", "SEGMENT REJECT LIMIT 25 PERCENT ");
+ }
+
+ elog(DEBUG2, "createPxfSampleStmt SQL statement: %s", str.data);
+
+ return str.data;
+}
+
+/*
+ * Returns the number of tuples in the first fragment of given
+ * PXF table.
+ * This is done by creating a copy of the PXF table, with additional parameters
+ * limiting the query to the first fragment only (pxf_max_fragments = 1, pxf_sample_ratio = 1.0),
+ * and running a COUNT query on it.
+ * The tuple count result is returned.
+ *
+ * Input:
+ * relationOid - relation to be sampled
+ */
+static float4 getPxfFragmentTupleCount(Oid relationOid)
+{
+ const char *schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+ const char *tableName = get_rel_name(relationOid); /* must be pfreed */
+ char *sampleSchemaName = pstrdup("pg_temp");
+ char *pxfEstimateTable = temporarySampleTableName(relationOid, "pg_analyze_pxf_est"); /* must be pfreed */
+ Oid pxfEstimateTableOid = InvalidOid;
+ RangeVar *rangeVar = NULL;
+ float4 ntuples = -1.0;
+
+ /* build copy of original pxf table */
+ buildPxfTableCopy(relationOid,
+ 1.0, /* get all tuples */
+ 1, /* query only first fragment */
+ schemaName, tableName,
+ sampleSchemaName, pxfEstimateTable);
+
+ rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, pxfEstimateTable, -1);
+ pxfEstimateTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+ if (pxfEstimateTableOid == InvalidOid)
+ {
+ elog(ERROR, "Unable to create a copy of PXF table %s.%s", schemaName, tableName);
+ }
+
+ /* run count query */
+ ntuples = countFirstFragmentTuples(sampleSchemaName, pxfEstimateTable);
+
+ Assert(pxfEstimateTable != InvalidOid);
+
+ elog(DEBUG2, "ANALYZE dropping PXF estimate table %s.%s (%d)",
+ sampleSchemaName, pxfEstimateTable, pxfEstimateTableOid);
+ dropSampleTable(pxfEstimateTableOid, true);
+
+ pfree((void *) rangeVar);
+ pfree((void *) pxfEstimateTable);
+ pfree((void *) tableName);
+ pfree((void *) schemaName);
+ pfree((void *) sampleSchemaName);
+
+ return ntuples;
+}
+
+static float4 countFirstFragmentTuples(const char* schemaName,
+ const char* tableName)
+{
+ float ntuples = -1.0;
+ StringInfoData str;
+
+ initStringInfo(&str);
+ appendStringInfo(&str, "select count(*)::float4 from %s.%s",
+ quote_identifier(schemaName),
+ quote_identifier(tableName));
+
+ /* in case of PXF error, analyze on this table will be reverted */
+ spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+ spiCallback_getSingleResultRowColumnAsFloat4, &ntuples);
+
+ pfree(str.data);
+
+ elog(DEBUG3, "count() of first pxf fragment gives %f values.", ntuples);
+
+ return ntuples;
+}
+
+/* --------------------------------
+ * getFragmentStats -
+ *
+ * Fetch number of fragments, size of first fragment and total size of datasource,
+ * for an external table which is PXF
+ * --------------------------------
+ */
+static void getFragmentStats(Relation rel, StringInfo location,
+ float4 *numfrags, float4 *firstfragsize,
+ float4 *totalsize)
+{
+
+ PxfFragmentStatsElem *elem = NULL;
+ elem = get_pxf_fragments_statistics(location->data, rel);
+
+ /*
+ * if get_pxf_fragments_statistics returned NULL - probably a communication error, we
+ * error out.
+ */
+ if (!elem)
+ {
+ elog(ERROR, "No statistics were returned for relation %s", RelationGetRelationName(rel));
+ }
+
+ *numfrags = elem->numFrags;
+ *firstfragsize = elem->firstFragSize;
+ *totalsize = elem->totalSize;
+ pfree(elem);
+
+ elog(DEBUG2, "ANALYZE estimate for PXF table %s: fragments %f, first frag size %f, "
+ "total size %f [max int %d]",
+ RelationGetRelationName(rel), *numfrags, *firstfragsize, *totalsize, INT_MAX);
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/pxfmasterapi.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfmasterapi.c b/src/backend/access/external/pxfmasterapi.c
index ba505b3..b1a6240 100644
--- a/src/backend/access/external/pxfmasterapi.c
+++ b/src/backend/access/external/pxfmasterapi.c
@@ -30,11 +30,11 @@
#include "catalog/hcatalog/externalmd.h"
static List* parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf);
-static PxfStatsElem* parse_get_stats_response(StringInfo rest_buf);
+static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf);
+static float4 normalize_size(long size, char* unit);
static List* parse_get_fragments_response(List* fragments, StringInfo rest_buf);
static void ha_failover(GPHDUri *hadoop_uri, ClientContext *client_context, char* rest_msg);
-static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *
-);
+static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg);
static char* concat(char *body, char *tail);
/*
@@ -72,14 +72,14 @@ parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf)
* Wrap the REST call with a retry for the HA HDFS scenario
*/
static void
-rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *restMsg)
+rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg)
{
Assert(hadoop_uri->host != NULL && hadoop_uri->port != NULL);
/* construct the request */
PG_TRY();
{
- call_rest(hadoop_uri, client_context, restMsg);
+ call_rest(hadoop_uri, client_context, rest_msg);
}
PG_CATCH();
{
@@ -93,7 +93,7 @@ rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *restMsg)
if (!elog_dismiss(DEBUG5))
PG_RE_THROW(); /* hope to never get here! */
- ha_failover(hadoop_uri, client_context, restMsg);
+ ha_failover(hadoop_uri, client_context, rest_msg);
}
else /*This is not HA - so let's re-throw */
PG_RE_THROW();
@@ -140,77 +140,83 @@ void free_datanode_rest_server(PxfServer* srv)
}
/*
- * Fetch the statistics from the PXF service
+ * Fetch fragment statistics from the PXF service
*/
-PxfStatsElem *get_data_statistics(GPHDUri* hadoop_uri,
- ClientContext *client_context,
- StringInfo err_msg)
+PxfFragmentStatsElem *get_fragments_statistics(GPHDUri* hadoop_uri,
+ ClientContext *client_context)
{
- char *restMsg = concat("http://%s:%s/%s/%s/Analyzer/getEstimatedStats?path=", hadoop_uri->data);
+ char *restMsg = concat("http://%s:%s/%s/%s/Fragmenter/getFragmentsStats?path=", hadoop_uri->data);
/* send the request. The response will exist in rest_buf.data */
- PG_TRY();
- {
- rest_request(hadoop_uri, client_context, restMsg);
- }
- PG_CATCH();
- {
- /*
- * communication problems with PXF service
- * Statistics for a table can be done as part of an ANALYZE procedure on many tables,
- * and we don't want to stop because of a communication error. So we catch the exception,
- * append its error to err_msg, and return a NULL,
- * which will force the the analyze code to use former calculated values or defaults.
- */
- if (err_msg)
- {
- char* message = elog_message();
- if (message)
- appendStringInfo(err_msg, "%s", message);
- else
- appendStringInfo(err_msg, "Unknown error");
- }
-
- /* release error state */
- if (!elog_dismiss(DEBUG5))
- PG_RE_THROW(); /* hope to never get here! */
-
- return NULL;
- }
- PG_END_TRY();
+ rest_request(hadoop_uri, client_context, restMsg);
- /* parse the JSON response and form a fragments list to return */
- return parse_get_stats_response(&(client_context->the_rest_buf));
+ /* parse the JSON response and form a statistics struct to return */
+ return parse_get_frag_stats_response(&(client_context->the_rest_buf));
}
/*
- * Parse the json response from the PXF Fragmenter.getSize
+ * Parse the json response from the PXF Fragmenter.getFragmentsStats
*/
-static PxfStatsElem *parse_get_stats_response(StringInfo rest_buf)
+static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf)
{
- PxfStatsElem* statsElem = (PxfStatsElem*)palloc0(sizeof(PxfStatsElem));
+ PxfFragmentStatsElem* statsElem = (PxfFragmentStatsElem*)palloc0(sizeof(PxfFragmentStatsElem));
struct json_object *whole = json_tokener_parse(rest_buf->data);
if ((whole == NULL) || is_error(whole))
{
elog(ERROR, "Failed to parse statistics data from PXF");
}
- struct json_object *head = json_object_object_get(whole, "PXFDataSourceStats");
-
- /* 0. block size */
- struct json_object *js_block_size = json_object_object_get(head, "blockSize");
- statsElem->blockSize = json_object_get_int(js_block_size);
-
- /* 1. number of blocks */
- struct json_object *js_num_blocks = json_object_object_get(head, "numberOfBlocks");
- statsElem->numBlocks = json_object_get_int(js_num_blocks);
-
- /* 2. number of tuples */
- struct json_object *js_num_tuples = json_object_object_get(head, "numberOfTuples");
- statsElem->numTuples = json_object_get_int(js_num_tuples);
+ struct json_object *head = json_object_object_get(whole, "PXFFragmentsStats");
+
+ /* 0. number of fragments */
+ struct json_object *js_num_fragments = json_object_object_get(head, "fragmentsNumber");
+ statsElem->numFrags = json_object_get_int(js_num_fragments);
+
+ /* 1. first fragment size */
+ struct json_object *js_first_frag_size = json_object_object_get(head, "firstFragmentSize");
+ struct json_object *js_size = json_object_object_get(js_first_frag_size, "size");
+ long size = json_object_get_int(js_size);
+ struct json_object *js_unit = json_object_object_get(js_first_frag_size, "unit");
+ char* unit = pstrdup(json_object_get_string(js_unit));
+ statsElem->firstFragSize = normalize_size(size, unit);
+ pfree(unit);
+
+ /* 2. total size */
+ struct json_object *js_total_size = json_object_object_get(head, "totalSize");
+ js_size = json_object_object_get(js_total_size, "size");
+ size = json_object_get_int(js_size);
+ js_unit = json_object_object_get(js_total_size, "unit");
+ unit = pstrdup(json_object_get_string(js_unit));
+ statsElem->totalSize = normalize_size(size, unit);
+ pfree(unit);
return statsElem;
}
+static float4 normalize_size(long size, char* unit) {
+ const float4 multiplier = 1024.0;
+ if (strcmp(unit,"B") == 0)
+ {
+ return size;
+ }
+ if (strcmp(unit,"KB") == 0)
+ {
+ return size * multiplier;
+ }
+ if (strcmp(unit,"MB") == 0)
+ {
+ return size * multiplier * multiplier;
+ }
+ if (strcmp(unit,"GB") == 0)
+ {
+ return size * multiplier * multiplier * multiplier;
+ }
+ if (strcmp(unit,"TB") == 0)
+ {
+ return size * multiplier * multiplier * multiplier * multiplier;
+ }
+ return -1;
+}
+
/*
* ha_failover
*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/Makefile b/src/backend/access/external/test/Makefile
index e13d107..5778958 100644
--- a/src/backend/access/external/test/Makefile
+++ b/src/backend/access/external/test/Makefile
@@ -1,7 +1,7 @@
subdir=src/backend/access/external
top_builddir=../../../../..
-TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi
+TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi pxfanalyze
# Objects from backend, which don't need to be mocked but need to be linked.
COMMON_REAL_OBJS=\
@@ -59,7 +59,11 @@ pxfmasterapi_REAL_OBJS=$(COMMON_REAL_OBJS) \
$(top_srcdir)/src/backend/utils/fmgrtab.o
pxffilters_REAL_OBJS=$(COMMON_REAL_OBJS) \
$(top_srcdir)/src/backend/optimizer/util/clauses.o \
- $(top_srcdir)/src/backend/parser/parse_expr.o
+ $(top_srcdir)/src/backend/parser/parse_expr.o
+pxfanalyze_REAL_OBJS=$(COMMON_REAL_OBJS) \
+ $(top_srcdir)/src/backend/utils/adt/ruleutils.o \
+ $(top_srcdir)/src/backend/parser/kwlookup.o \
+ $(top_srcdir)/src/backend/utils/mb/encnames.o
include ../../../../Makefile.mock
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/README.txt
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/README.txt b/src/backend/access/external/test/README.txt
index 75a03f0..dfe73b8 100644
--- a/src/backend/access/external/test/README.txt
+++ b/src/backend/access/external/test/README.txt
@@ -1,6 +1,9 @@
Directory with the following System Under Test (SUT):
- - pxfuriparser.c
- - hd_work_mgr.c
- - pxfheaders.c
- ha_config.c
+ - hd_work_mgr.c
+ - pxfanalyze.c
- pxffilters.c
+ - pxfheaders.c
+ - pxfmasterapi.c
+ - pxfuriparser.c
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
index ce8e61d..91bfa43 100644
--- a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
+++ b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
@@ -28,15 +28,17 @@
/*
* check element list_index in segmenet_list
- * has the expected hostip.
+ * has the expected hostip and segindex.
*/
void check_segment_info(List* segment_list, int list_index,
- const char* expected_hostip)
+ const char* expected_hostip,
+ int expected_segindex)
{
Segment* seg_info =
(Segment*)(list_nth(segment_list, list_index));
assert_string_equal(seg_info->hostip, expected_hostip);
+ assert_int_equal(seg_info->segindex, expected_segindex);
}
/*
@@ -76,9 +78,9 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
gphost = (GpHost*)lfirst(cell);
assert_string_equal(gphost->ip, array_of_segs[0]);
assert_int_equal(list_length(gphost->segs), 4);
- for (int i = 0; i < 4; ++i)
+ for (int i = 0; i < 4; ++i)
{
- check_segment_info(gphost->segs, i, "1.2.3.1");
+ check_segment_info(gphost->segs, i, "1.2.3.1", i);
}
cell = list_nth_cell(groups, 1);
@@ -87,7 +89,7 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
assert_int_equal(list_length(gphost->segs), 3);
for (int i = 0; i < 3; ++i)
{
- check_segment_info(gphost->segs, i, "1.2.3.2");
+ check_segment_info(gphost->segs, i, "1.2.3.2", i+4);
}
cell = list_nth_cell(groups, 2);
@@ -96,9 +98,8 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
assert_int_equal(list_length(gphost->segs), 3);
for (int i = 0; i < 3; ++i)
{
- check_segment_info(gphost->segs, i, "1.2.3.3");
+ check_segment_info(gphost->segs, i, "1.2.3.3", i+7);
}
freeQueryResource();
}
-