You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by nh...@apache.org on 2015/11/20 22:50:39 UTC

[1/3] incubator-hawq git commit: HAWQ-44. Advanced statistics for PXF tables.

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 4dbb3479f -> 81385f09f


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/pxfanalyze_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/pxfanalyze_test.c b/src/backend/access/external/test/pxfanalyze_test.c
new file mode 100644
index 0000000..a70c470
--- /dev/null
+++ b/src/backend/access/external/test/pxfanalyze_test.c
@@ -0,0 +1,171 @@
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include "cmockery.h"
+
+#include "c.h"
+#include "../pxfanalyze.c"
+#include "catalog/pg_exttable.h"
+
+
+static void runTest__calculateSamplingRatio(float4 relTuples, float4 relFrags, float4 requestedSampleSize,
+		int maxFrags, float4 expectedResult);
+static void runTest__createPxfSampleStmt(float4 pxf_sample_ratio,
+		const char* expectedRatio,
+		char fmtcode,
+		const char* expectedFmt,
+		const char* fmtopts,
+		const char* expectedFmtopts,
+		int rejectLimit);
+
+
+static void runTest__calculateSamplingRatio(float4 relTuples, float4 relFrags, float4 requestedSampleSize,
+		int maxFrags, float4 expectedResult)
+{
+	int pxf_stat_max_fragments_orig = pxf_stat_max_fragments;
+
+	pxf_stat_max_fragments = maxFrags;
+
+	float4 result = calculateSamplingRatio(relTuples, relFrags, requestedSampleSize);
+
+	pxf_stat_max_fragments = pxf_stat_max_fragments_orig;
+
+	assert_true(fabs(expectedResult - result) <= 0.00001);
+}
+
+void
+test__calculateSamplingRatio__relFragsLTmaxFrags(void **state)
+{
+	/*
+	 * original ratio: 20,000/100,000 = 0.20
+	 */
+	runTest__calculateSamplingRatio(100000, 1000, 20000, 1500, 0.2);
+}
+
+void
+test__calculateSamplingRatio__relFragsGTmaxFrags(void **state)
+{
+	/*
+	 * original ratio: 20,000/100,000 = 0.20
+	 * corrected ratio: 0.20*(1000/900) ~ 0.22
+	 */
+	runTest__calculateSamplingRatio(100000, 1000, 20000, 900, 0.222223);
+}
+
+void
+test__calculateSamplingRatio__ratioGT1(void **state)
+{
+	/*
+	 * original ratio: 20,000/100,000 = 0.20
+	 * corrected ratio: 0.20*(1000/100)=2.0 -> 1.0
+	 */
+	runTest__calculateSamplingRatio(100000, 1000, 20000, 100, 1.0);
+}
+
+void
+test__calculateSamplingRatio__ratioTooLow(void **state)
+{
+	/*
+	 * original ratio: 2,000/100,000,000 = 0.00002
+	 * corrected ratio: 0.0001
+	 */
+	runTest__calculateSamplingRatio(100000000, 1000, 2000, 1000, 0.0001);
+}
+
+static void runTest__createPxfSampleStmt(float4 pxf_sample_ratio,
+		const char* expectedRatio,
+		char fmtcode,
+		const char* expectedFmt,
+		const char* fmtopts,
+		const char* expectedFmtopts,
+		int rejectLimit)
+{
+		/* input */
+		Oid relationOid = 13;
+		const char* schemaName = "orig_schema";
+		const char* tableName = "orig_table";
+		const char* sampleSchemaName = "sample_schema";
+		const char* pxfSampleTable = "pxf_sample_table";
+
+		int pxf_max_fragments = 1000;
+
+		const char* location = "the_table-s_location";
+
+		Value* locationValue = (Value *) palloc0(sizeof(Value));
+		locationValue->type = T_String;
+		locationValue->val.str = location;
+
+		ExtTableEntry *extTable = (ExtTableEntry *) palloc0(sizeof(ExtTableEntry));
+		extTable->encoding = 6;
+		extTable->fmtcode = fmtcode;
+		extTable->fmtopts = fmtopts;
+		extTable->rejectlimit = rejectLimit;
+		extTable->locations = lappend(extTable->locations, locationValue);
+
+		/* get fake external table details */
+		expect_value(GetExtTableEntry, relid, relationOid);
+		will_return(GetExtTableEntry, extTable);
+
+		char* expectedResult = palloc0(1024);
+		sprintf(expectedResult,
+				"CREATE EXTERNAL TABLE %s.%s (LIKE %s.%s) "
+				"LOCATION(E'%s&STATS-SAMPLE-RATIO=%s&STATS-MAX-FRAGMENTS=%d') "
+				"FORMAT '%s' (%s) "
+				"ENCODING 'UTF8' "
+				"%s",
+				sampleSchemaName, pxfSampleTable, schemaName, tableName,
+				location, expectedRatio, pxf_max_fragments,
+				expectedFmt, expectedFmtopts,
+				(rejectLimit != -1) ? "SEGMENT REJECT LIMIT 25 PERCENT " : "");
+
+		char* result = createPxfSampleStmt(relationOid, schemaName, tableName, sampleSchemaName, pxfSampleTable,
+				pxf_sample_ratio, pxf_max_fragments);
+
+		assert_string_equal(expectedResult, result);
+
+		pfree(locationValue);
+		pfree(extTable);
+		pfree(expectedResult);
+		pfree(result);
+}
+
+void
+test__createPxfSampleStmt__textFormat(void **state)
+{
+	const char* fmtopts = "delimiter ',' null '\\N' escape '\\'";
+	const char* expectedFmtopts = "delimiter E',' null E'\\\\N' escape E'\\\\'";
+	runTest__createPxfSampleStmt(0.12, "0.1200", 't', "text", fmtopts, expectedFmtopts, 30);
+}
+
+void
+test__createPxfSampleStmt__customFormatNoRejectLimit(void **state)
+{
+	const char* fmtopts = "formatter 'pxfwritable_import'";
+	const char* expectedFmtopts = "formatter = 'pxfwritable_import'";
+	runTest__createPxfSampleStmt(0.5555555, "0.5556", 'b', "custom", fmtopts, expectedFmtopts, -1);
+}
+
+void
+test__createPxfSampleStmt__csvFormatUnprintableOptions(void **state)
+{
+	const char* fmtopts = "delimiter '\x01' null '\\N' escape '\x02\x03'";
+	const char* expectedFmtopts = "delimiter E'\\x01' null E'\\\\N' escape E'\\x02\\x03'";
+	runTest__createPxfSampleStmt(0.003, "0.0030", 'c', "csv", fmtopts, expectedFmtopts, 100);
+}
+
+int
+main(int argc, char* argv[])
+{
+	cmockery_parse_arguments(argc, argv);
+
+	const UnitTest tests[] = {
+			unit_test(test__calculateSamplingRatio__relFragsLTmaxFrags),
+			unit_test(test__calculateSamplingRatio__relFragsGTmaxFrags),
+			unit_test(test__calculateSamplingRatio__ratioGT1),
+			unit_test(test__calculateSamplingRatio__ratioTooLow),
+			unit_test(test__createPxfSampleStmt__textFormat),
+			unit_test(test__createPxfSampleStmt__customFormatNoRejectLimit),
+			unit_test(test__createPxfSampleStmt__csvFormatUnprintableOptions)
+	};
+	return run_tests(tests);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/pxfmasterapi_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/pxfmasterapi_test.c b/src/backend/access/external/test/pxfmasterapi_test.c
index dbabb5b..816b277 100644
--- a/src/backend/access/external/test/pxfmasterapi_test.c
+++ b/src/backend/access/external/test/pxfmasterapi_test.c
@@ -234,6 +234,24 @@ test__rest_request__callRestHASuccessFromTheFirstCall(void **state)
 	pfree(client_context);
 }
 
+void
+test__normalize_size(void **state)
+{
+	float4 result = normalize_size(10000000, "B");
+	assert_int_equal(result, 10000000);
+
+	result = normalize_size(10000000, "KB");
+	assert_int_equal(result, 10240000000);
+
+	result = normalize_size(500, "MB");
+	assert_int_equal(result, 524288000);
+
+	result = normalize_size(10, "GB");
+	assert_int_equal(result, 10737418240);
+
+	result = normalize_size(10000, "TB");
+	assert_int_equal(result, 10995116277760000);
+}
 
 int 
 main(int argc, char *argv[]) 
@@ -244,7 +262,8 @@ main(int argc, char *argv[])
 		    unit_test(test__rest_request__callRestThrowsNoHA),
 		    unit_test(test__rest_request__callRestThrowsHAFirstTime),
 		    unit_test(test__rest_request__callRestThrowsHASecondTime),
-		    unit_test(test__rest_request__callRestHASuccessFromTheFirstCall)
+		    unit_test(test__rest_request__callRestHASuccessFromTheFirstCall),
+			unit_test(test__normalize_size)
 	};
 	return run_tests(tests);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/commands/analyze.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 67fc8d4..fcbedb0 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -28,6 +28,7 @@
 #include "access/pxfuriparser.h"
 #include "access/heapam.h"
 #include "access/hd_work_mgr.h"
+#include "access/pxfanalyze.h"
 #include "catalog/catquery.h"
 #include "catalog/heap.h"
 #include "access/transam.h"
@@ -140,13 +141,12 @@ static bool hasMaxDefined(Oid relationOid, const char *attributeName);
 
 /* Sampling related */
 static float4 estimateSampleSize(Oid relationOid, const char *attributeName, float4 relTuples);
-static char* temporarySampleTableName(Oid relationOid);
-static Oid buildSampleTable(Oid relationOid, 
+static Oid buildSampleTable(Oid relationOid,
+		char* sampleTableName,
 		List *lAttributeNames, 
 		float4	relTuples,
 		float4 	requestedSampleSize, 
 		float4 *sampleTableRelTuples);
-static void dropSampleTable(Oid sampleTableOid);
 
 /* Attribute statistics computation */
 static int4 numberOfMCVEntries(Oid relationOid, const char *attributeName);
@@ -198,25 +198,6 @@ static void updateAttributeStatisticsInCatalog(Oid relationOid, const char *attr
 		AttributeStatistics *stats);
 static void updateReltuplesRelpagesInCatalog(Oid relationOid, float4 relTuples, float4 relPages);
 
-/* Convenience */
-static ArrayType * SPIResultToArray(int resultAttributeNumber, MemoryContext allocationContext);
-
-/* spi execution helpers */
-typedef void (*spiCallback)(void *clientDataOut);
-static void spiExecuteWithCallback(const char *src, bool read_only, long tcount,
-           spiCallback callbackFn, void *clientData);
-
-typedef struct
-{
-    int numColumns;
-    MemoryContext memoryContext;
-    ArrayType ** output;
-} EachResultColumnAsArraySpec;
-
-static void spiCallback_getEachResultColumnAsArray(void *clientData);
-static void spiCallback_getProcessedAsFloat4(void *clientData);
-static void spiCallback_getSingleResultRowColumnAsFloat4(void *clientData);
-
 /**
  * Extern stuff.
  */
@@ -604,9 +585,9 @@ void analyzeStmt(VacuumStmt *stmt, List *relids)
 						FaultInjector_InjectFaultIfSet(
 								AnalyzeSubxactError,
 								DDLNotSpecified,
-								"",  // databaseName
-								""); // tableName
-#endif // FAULT_INJECTOR
+								"",  /* databaseName */
+								""); /* tableName */
+#endif /* FAULT_INJECTOR */
 
 						ReleaseCurrentSubTransaction();
 						MemoryContextSwitchTo(oldcontext);
@@ -883,7 +864,7 @@ static List *analyzableAttributes(Relation candidateRelation)
 				|| attr->atttypid == UNKNOWNOID))
 		{
 			char	*attName = NULL;
-			attName = pstrdup(NameStr(attr->attname)); //needs to be pfree'd by caller
+			attName = pstrdup(NameStr(attr->attname)); /* needs to be pfree'd by caller */
 			Assert(attName);
 			lAttNames = lappend(lAttNames, (void *) attName);
 		}
@@ -932,16 +913,7 @@ static void analyzeRelation(Relation relation, List *lAttributeNames, bool rooto
 	}
 	else
 	{
-		initStringInfo(&err_msg);
-		gp_statistics_estimate_reltuples_relpages_external_pxf(relation, &location, &estimatedRelTuples, &estimatedRelPages, &err_msg);
-		if (err_msg.len > 0)
-		{
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- error returned: %s",
-							RelationGetRelationName(relation),
-							err_msg.data)));
-		}
-		pfree(err_msg.data);
+		analyzePxfEstimateReltuplesRelpages(relation, &location, &estimatedRelTuples, &estimatedRelPages);
 	}
 	pfree(location.data);
 	
@@ -987,15 +959,6 @@ static void analyzeRelation(Relation relation, List *lAttributeNames, bool rooto
 	pgstat_report_analyze(relation, estimatedRelTuples, 0 /*totaldeadrows*/);
 	
 	/**
-	 * For an external PXF table, the next steps are irrelevant - it's time to leave
-	 */
-	if (isExternalPxfReadOnly)
-	{
-		elog(elevel, "ANALYZE on PXF table %s computes only reltuples and relpages.", RelationGetRelationName(relation));
-		return;
-	}
-
-	/**
 	 * Does the relation have any rows. If not, no point analyzing columns.
 	 */
 	if (estimatedRelTuples < 1.0)
@@ -1053,9 +1016,13 @@ static void analyzeRelation(Relation relation, List *lAttributeNames, bool rooto
 	 * Determine if a sample table needs to be created. If reltuples is very small,
 	 * then, we'd rather work off the entire table. Also, if the sample required is
 	 * the size of the table, then we'd rather work off the entire table.
+	 *
+	 * In case of PXF table, we always need a sample table because the various calculations
+	 * should be done locally in HAWQ and not by retrieving the data again and again.
 	 */
-	if (estimatedRelTuples <= gp_statistics_sampling_threshold 
-			|| minSampleTableSize >= estimatedRelTuples) /* maybe this should be K% of reltuples or something? */
+	if (!isExternalPxfReadOnly &&
+			(estimatedRelTuples <= gp_statistics_sampling_threshold
+			|| minSampleTableSize >= estimatedRelTuples)) /* maybe this should be K% of reltuples or something? */
 	{
 		sampleTableRequired = false;
 	}
@@ -1065,11 +1032,33 @@ static void analyzeRelation(Relation relation, List *lAttributeNames, bool rooto
 	 */
 	if (sampleTableRequired)
 	{
-		elog(elevel, "ANALYZE building sample table of size %.0f on table %s because it has too many rows.", minSampleTableSize, RelationGetRelationName(relation));
-		sampleTableOid = buildSampleTable(relationOid, lAttributeNames, estimatedRelTuples, minSampleTableSize, &sampleTableRelTuples);
-		
+		char * sampleTableName = temporarySampleTableName(relationOid, "pg_analyze"); /* must be pfreed */
+
+		elog(elevel, "ANALYZE building sample table of size %.0f on table %s because %s.",
+				minSampleTableSize, RelationGetRelationName(relation),
+				isExternalPxfReadOnly ? "it's a PXF table" : "it has too many rows");
+
+		if (isExternalPxfReadOnly)
+		{
+			sampleTableOid = buildPxfSampleTable(relationOid, sampleTableName, lAttributeNames,
+					estimatedRelTuples, estimatedRelPages, minSampleTableSize,
+					&sampleTableRelTuples);
+		}
+		else
+		{
+			sampleTableOid = buildSampleTable(relationOid, sampleTableName, lAttributeNames,
+					estimatedRelTuples, minSampleTableSize, &sampleTableRelTuples);
+		}
+		/*
+		 * Update the sample table's reltuples, relpages. Without these, the queries to the sample table would call cdbRelsize which can be an expensive call.
+		 * We know the number of tuples in the sample table, but don't have the information about the number of pages. We set it to 2 arbitrarily.
+		 */
+		updateReltuplesRelpagesInCatalog(sampleTableOid, sampleTableRelTuples, 2);
+
 		/* We must have a non-empty sample table */
-		Assert(sampleTableRelTuples > 0.0);	
+		Assert(sampleTableRelTuples > 0.0);
+
+		pfree((void *) sampleTableName);
 	}
 	
 	/**
@@ -1093,7 +1082,7 @@ static void analyzeRelation(Relation relation, List *lAttributeNames, bool rooto
 	if (sampleTableRequired)
 	{
 		elog(elevel, "ANALYZE dropping sample table");
-		dropSampleTable(sampleTableOid);
+		dropSampleTable(sampleTableOid, false);
 	}
 	
 	return;
@@ -1104,14 +1093,14 @@ static void analyzeRelation(Relation relation, List *lAttributeNames, bool rooto
  * This is not super random. However, this should be sufficient for our purpose.
  * Input:
  * 	relationOid 	- relation
- * 	backendId	- pid of the backend.
+ * 	prefix			- sample name prefix
  * Output:
  * 	sample table name. This must be pfree'd by the caller.
  */
-static char* temporarySampleTableName(Oid relationOid)
+char* temporarySampleTableName(Oid relationOid, char* prefix)
 {
 	char tmpname[NAMEDATALEN];
-	snprintf(tmpname, NAMEDATALEN, "pg_analyze_%u_%i", relationOid, MyBackendId);
+	snprintf(tmpname, NAMEDATALEN, "%s_%u_%i", prefix, relationOid, MyBackendId);
 	return pstrdup(tmpname);
 }
 
@@ -1217,18 +1206,18 @@ static float4 estimateSampleSize(Oid relationOid, const char *attributeName, flo
  * 	read_only - is it a read-only call?
  * 	tcount - execution tuple-count limit, or 0 for none
  * 	callbackFn - callback function to be executed once SPI is done.
- * 	clientData - argument to call back function (usually pointer to data-structure 
+ * 	clientData - argument to call back function (usually pointer to data-structure
  * 				that the callback function populates).
- * 
+ *
  */
-static void spiExecuteWithCallback(
+void spiExecuteWithCallback(
 		const char *src,
 		bool read_only,
 		long tcount,
 		spiCallback callbackFn,
 		void *clientData)
 {
-	bool connected = false;
+	volatile bool connected = false; /* needs to be volatile when accessed by PG_CATCH */
 	int ret = 0;
 
 	PG_TRY();
@@ -1236,11 +1225,11 @@ static void spiExecuteWithCallback(
 		if (SPI_OK_CONNECT != SPI_connect())
 		{
 			ereport(ERROR, (errcode(ERRCODE_CDB_INTERNAL_ERROR),
-					errmsg("Unable to connect to execute internal query.")));
+					errmsg("Unable to connect to execute internal query: %s.", src)));
 		}
 		connected = true;
 
-		elog(elevel, "Executing SQL: %s", src);
+		elog(DEBUG2, "Executing SQL: %s", src);
 		
 		/* Do the query. */
 		ret = SPI_execute(src, read_only, tcount);
@@ -1251,13 +1240,17 @@ static void spiExecuteWithCallback(
 			callbackFn(clientData);
 		}
 		connected = false;
-		SPI_finish();
+		int res = SPI_finish();
+		elog(DEBUG5, "finish SPI %s, res %d, ret %d", src, res, ret);
 	}
 	/* Clean up in case of error. */
 	PG_CATCH();
 	{
 		if (connected)
-			SPI_finish();
+		{
+			int res = SPI_finish();
+			elog(DEBUG5, "finish SPI %s after error, res %d, ret %d", src, res, ret);
+		}
 
 		/* Carry on with error handling. */
 		PG_RE_THROW();
@@ -1269,15 +1262,15 @@ static void spiExecuteWithCallback(
  * A callback function for use with spiExecuteWithCallback.  Asserts that exactly one row was returned.
  *  Gets the row's first column as a float, using 0.0 if the value is null
  */
-static void spiCallback_getSingleResultRowColumnAsFloat4(void *clientData)
+void spiCallback_getSingleResultRowColumnAsFloat4(void *clientData)
 {
 	Datum datum_f;
 	bool isnull = false;
 	float4 *out = (float4*) clientData;
 
-    Assert(SPI_tuptable != NULL); // must have result
-    Assert(SPI_processed == 1); //we expect only one tuple.
-	Assert(SPI_tuptable->tupdesc->attrs[0]->atttypid == FLOAT4OID); // must be float4
+    Assert(SPI_tuptable != NULL); /* must have result */
+    Assert(SPI_processed == 1); /* we expect only one tuple. */
+	Assert(SPI_tuptable->tupdesc->attrs[0]->atttypid == FLOAT4OID); /* must be float4 */
 
 	datum_f = heap_getattr(SPI_tuptable->vals[0], 1, SPI_tuptable->tupdesc, &isnull);
 
@@ -1295,7 +1288,7 @@ static void spiCallback_getSingleResultRowColumnAsFloat4(void *clientData)
  * A callback function for use with spiExecuteWithCallback.  Copies the SPI_processed value into
  *    *clientDataOut, treating it as a float4 pointer.
  */
-static void spiCallback_getProcessedAsFloat4(void *clientData)
+void spiCallback_getProcessedAsFloat4(void *clientData)
 {
     float4 *out = (float4*) clientData;
     *out = (float4)SPI_processed;
@@ -1306,7 +1299,7 @@ static void spiCallback_getProcessedAsFloat4(void *clientData)
  *   The number of arrays, the memory context for them, and the output location are determined by
  *   treating *clientData as a EachResultColumnAsArraySpec and using the values there
  */
-static void spiCallback_getEachResultColumnAsArray(void *clientData)
+void spiCallback_getEachResultColumnAsArray(void *clientData)
 {
     EachResultColumnAsArraySpec * spec = (EachResultColumnAsArraySpec*) clientData;
     int i;
@@ -1334,14 +1327,16 @@ static void spiCallback_getEachResultColumnAsArray(void *clientData)
  * 
  * Input:
  * 	relationOid 	- relation to be sampled
+ * 	sampleTableName - sample table name, moderately unique
  * 	lAttributeNames - attributes to be included in the sample
  * 	relTuples		- estimated size of relation
  * 	requestedSampleSize - as determined by attribute statistics requirements.
- * 	sampleLimit		- limit on size of the sample.
+ * 	sampleTableRelTuples    - limit on size of the sample.
  * Output:
  * 	sampleTableRelTuples - number of tuples in the sample table created.
  */
 static Oid buildSampleTable(Oid relationOid, 
+		char* sampleTableName,
 		List *lAttributeNames, 
 		float4	relTuples,
 		float4 	requestedSampleSize, 
@@ -1354,7 +1349,6 @@ static Oid buildSampleTable(Oid relationOid,
 	const char *schemaName = NULL;
 	const char *tableName = NULL;
 	char	*sampleSchemaName = pstrdup("pg_temp"); 
-	char 	*sampleTableName = NULL;
 	Oid			sampleTableOid = InvalidOid;
 	float4		randomThreshold = 0.0;
 	RangeVar 	*rangeVar = NULL;
@@ -1364,11 +1358,11 @@ static Oid buildSampleTable(Oid relationOid,
 	
 	randomThreshold = requestedSampleSize / relTuples;
 	
-	schemaName = get_namespace_name(get_rel_namespace(relationOid)); //must be pfreed
-	tableName = get_rel_name(relationOid); //must be pfreed
-	sampleTableName = temporarySampleTableName(relationOid); // must be pfreed 
+	schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+	tableName = get_rel_name(relationOid); /* must be pfreed */
 
 	initStringInfo(&str);
+
 	appendStringInfo(&str, "create table %s.%s as (select ", 
 			quote_identifier(sampleSchemaName), quote_identifier(sampleTableName)); 
 	
@@ -1387,7 +1381,7 @@ static Oid buildSampleTable(Oid relationOid,
 		}
 	}
 	
-	// if table is partitioned, we create a sample over all parts
+	/* if table is partitioned, we create a sample over all parts */
 	appendStringInfo(&str, "from %s.%s as Ta where random() < %.38f limit %lu) distributed randomly", 
 			quote_identifier(schemaName), 
 			quote_identifier(tableName), randomThreshold, (unsigned long) requestedSampleSize);
@@ -1419,14 +1413,7 @@ static Oid buildSampleTable(Oid relationOid,
 				quote_identifier(tableName));
 	}
 	
-	/* 
-	 * Update the sample table's reltuples, relpages. Without these, the queries to the sample table would call cdbRelsize which can be an expensive call. 
-	 * We know the number of tuples in the sample table, but don't have the information about the number of pages. We set it to 2 arbitrarily.
-	 */
-	updateReltuplesRelpagesInCatalog(sampleTableOid, *sampleTableRelTuples, 2);
-
 	pfree((void *) rangeVar);
-	pfree((void *) sampleTableName);
 	pfree((void *) tableName);
 	pfree((void *) schemaName);
 	pfree((void *) sampleSchemaName);
@@ -1436,17 +1423,18 @@ static Oid buildSampleTable(Oid relationOid,
 /**
  * Drops the sample table created during ANALYZE.
  */
-static void dropSampleTable(Oid sampleTableOid)
+void dropSampleTable(Oid sampleTableOid, bool isExternal)
 {
 	StringInfoData str;
 	const char *sampleSchemaName = NULL;
 	const char *sampleTableName = NULL;
 
-	sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); //must be pfreed
-	sampleTableName = get_rel_name(sampleTableOid); // must be pfreed 	
+	sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); /* must be pfreed */
+	sampleTableName = get_rel_name(sampleTableOid); /* must be pfreed */
 
 	initStringInfo(&str);
-	appendStringInfo(&str, "drop table %s.%s", 
+	appendStringInfo(&str, "drop %stable %s.%s",
+			isExternal ? "external " : "",
 			quote_identifier(sampleSchemaName), 
 			quote_identifier(sampleTableName));
 	
@@ -1458,7 +1446,6 @@ static void dropSampleTable(Oid sampleTableOid)
 	pfree((void *)sampleTableName);
 }
 
-
 /**
  * This method determines the number of pages corresponding to an index.
  * Input:
@@ -1766,8 +1753,8 @@ static float4 analyzeComputeNDistinctAbsolute(Oid sampleTableOid,
 	const char *sampleSchemaName = NULL;
 	const char *sampleTableName = NULL;
 	
-	sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); //must be pfreed
-	sampleTableName = get_rel_name(sampleTableOid); // must be pfreed 	
+	sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); /* must be pfreed */
+	sampleTableName = get_rel_name(sampleTableOid); /* must be pfreed */
 
 	initStringInfo(&str);
 	appendStringInfo(&str, "select count(*)::float4 from (select Ta.%s from %s.%s as Ta group by Ta.%s) as Tb",
@@ -1807,14 +1794,14 @@ static float4 analyzeComputeNRepeating(Oid relationOid,
 	const char *sampleSchemaName = NULL;
 	const char *sampleTableName = NULL;
 	
-	sampleSchemaName = get_namespace_name(get_rel_namespace(relationOid)); //must be pfreed
-	sampleTableName = get_rel_name(relationOid); // must be pfreed 	
+	sampleSchemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+	sampleTableName = get_rel_name(relationOid); /* must be pfreed */
 
 	initStringInfo(&str);
 	appendStringInfo(&str, "select count(v)::float4 from (select Ta.%s as v, count(Ta.%s) as f from %s.%s as Ta group by Ta.%s) as foo where f > 1",
-			quote_identifier(attributeName), 
 			quote_identifier(attributeName),
-			quote_identifier(sampleSchemaName), 
+			quote_identifier(attributeName),
+			quote_identifier(sampleSchemaName),
 			quote_identifier(sampleTableName),
 			quote_identifier(attributeName));
 
@@ -1838,7 +1825,7 @@ static float4 analyzeComputeNRepeating(Oid relationOid,
  * Output:
  * 	array of attribute type
  */
-static ArrayType * SPIResultToArray(int resultAttributeNumber, MemoryContext allocationContext)
+ArrayType * SPIResultToArray(int resultAttributeNumber, MemoryContext allocationContext)
 {
 	ArrayType *result = NULL;
 	int i = 0;
@@ -1962,8 +1949,8 @@ static float4 analyzeNullCount(Oid sampleTableOid, Oid relationOid, const char *
 		const char *schemaName = NULL;
 		const char *tableName = NULL;
 
-		schemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); //must be pfreed
-		tableName = get_rel_name(sampleTableOid); // must be pfreed
+		schemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); /* must be pfreed */
+		tableName = get_rel_name(sampleTableOid); /* must be pfreed */
 
 		initStringInfo(&str);
 		appendStringInfo(&str, "select count(*)::float4 from %s.%s as Ta where Ta.%s is null",
@@ -2064,8 +2051,8 @@ static float4 analyzeComputeAverageWidth(Oid sampleTableOid,
 		const char *sampleSchemaName = NULL;
 		const char *sampleTableName = NULL;
 
-		sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); //must be pfreed
-		sampleTableName = get_rel_name(sampleTableOid); // must be pfreed
+		sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); /* must be pfreed */
+		sampleTableName = get_rel_name(sampleTableOid); /* must be pfreed */
 
 		initStringInfo(&str);
 		appendStringInfo(&str, "select avg(pg_column_size(Ta.%s))::float4 from %s.%s as Ta where Ta.%s is not null",
@@ -2130,8 +2117,8 @@ static void analyzeComputeMCV(Oid relationOid,
 		Assert(relTuples > 0.0);
 		Assert(nEntries > 0);
 
-		sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); //must be pfreed
-		sampleTableName = get_rel_name(sampleTableOid); // must be pfreed
+		sampleSchemaName = get_namespace_name(get_rel_namespace(sampleTableOid)); /* must be pfreed */
+		sampleTableName = get_rel_name(sampleTableOid); /* must be pfreed */
 
 		initStringInfo(&str);
 		appendStringInfo(&str, "select Ta.%s as v, count(Ta.%s)::float4/%f::float4 as f from %s.%s as Ta "
@@ -2880,7 +2867,6 @@ static void updateAttributeStatisticsInCatalog(Oid relationOid, const char *attr
 
 }
 
-
 /**
  * This method estimates the number of tuples and pages in a heaptable relation. Getting the number of blocks is straightforward.
  * Estimating the number of tuples is a little trickier. There are two factors that complicate this:
@@ -2966,7 +2952,7 @@ static void gp_statistics_estimate_reltuples_relpages_heap(Relation rel, float4
 			 * could get added to it, but we ignore such tuples.
 			 */
 
-			// -------- MirroredLock ----------
+			/* -------- MirroredLock ---------- */
 			MIRROREDLOCK_BUFMGR_LOCK;
 
 			targbuffer = ReadBuffer(rel, targblock);
@@ -3002,7 +2988,7 @@ static void gp_statistics_estimate_reltuples_relpages_heap(Relation rel, float4
 			UnlockReleaseBuffer(targbuffer);
 
 			MIRROREDLOCK_BUFMGR_UNLOCK;
-			// -------- MirroredLock ----------
+			/* -------- MirroredLock ---------- */
 
 			nblocksseen++;
 		}		
@@ -3107,44 +3093,3 @@ static void gp_statistics_estimate_reltuples_relpages_parquet(Relation rel, floa
 	pfree(fstotal);
 	return;
 }
-
-/* --------------------------------
- *		gp_statistics_estimate_reltuples_relpages_external_pxf -
- *
- *		Fetch reltuples and relpages for an external table which is PXF
- * --------------------------------
- */
-void gp_statistics_estimate_reltuples_relpages_external_pxf(Relation rel, StringInfo location,
-															float4 *reltuples, float4 *relpages,
-															StringInfo err_msg)
-{
-
-	PxfStatsElem *elem = NULL;
-	elem = get_pxf_statistics(location->data, rel, err_msg);
-
-	/*
-	 * if get_pxf_statistics returned NULL - probably a communication error, we fall back to former values
-	 * for the relation (can be default if no analyze was run successfully before)
-	 * we don't want to stop the analyze, since this can be part of a long procedure performed on many tables
-	 * not just this one
-	 */
-	if (!elem)
-	{
-		*relpages = rel->rd_rel->relpages;
-		*reltuples = rel->rd_rel->reltuples;
-		return;
-	}
-	
-	*relpages = floor(( ((float4)elem->blockSize) * elem->numBlocks) / BLCKSZ);
-	*reltuples = elem->numTuples;
-	/* relpages can't be 0 if there are tuples in the table. */
-	if ((*relpages < 1.0) && (*reltuples > 0))
-		*relpages = 1.0;
-	pfree(elem);
-	
-	/* in case there were problems with the PXF service, keep the defaults */
-	if (*relpages < 0)
-		*relpages =  gp_external_table_default_number_of_pages;
-	if (*reltuples < 0)
-		*reltuples =  gp_external_table_default_number_of_tuples;
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/commands/tablecmds.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 547725e..b47bdce 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -895,11 +895,11 @@ static Datum AddDefaultPageRowGroupSize(Datum relOptions, List *defList){
 * In here we first dispatch a normal DefineRelation() (with relstorage
 * external) in order to create the external relation entries in pg_class
 * pg_type etc. Then once this is done we dispatch ourselves (DefineExternalRelation)
-* in order to create the pg_exttable entry accross the gp array and we
+* in order to create the pg_exttable entry across the gp array and we
 * also record a dependency with the error table, if one exists.
 *
 * Why don't we just do all of this in one dispatch run? because that
-* involves duplicating the DefineRelation() code or severly modifying it
+* involves duplicating the DefineRelation() code or severely modifying it
 * to have special cases for external tables. IMHO it's better and cleaner
 * to leave it intact and do another dispatch.
 * ----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/optimizer/util/plancat.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 2258efe..b15b32e 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -21,7 +21,6 @@
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/parquetsegfiles.h"
-#include "access/pxfuriparser.h"
 #include "catalog/catquery.h"
 #include "catalog/gp_policy.h"
 #include "catalog/pg_inherits.h"
@@ -80,9 +79,6 @@ cdb_estimate_rel_size(RelOptInfo   *relOptInfo,
 static void
 cdb_default_stats_warning_for_index(Oid reloid, Oid indexoid);
 
-static bool 
-need_to_get_stats_pxf(Relation rel, StringInfo location, BlockNumber relpages, double reltuples);
-
 extern BlockNumber RelationGuessNumberOfBlocks(double totalbytes);
 
 /*
@@ -395,20 +391,7 @@ cdb_estimate_rel_size(RelOptInfo   *relOptInfo,
 	 * Asking the QE for the size of the relation is a bit expensive.
 	 * Do we want to do it all the time?  Or only for tables that have never had analyze run?
 	 */
-	if (need_to_get_stats_pxf(rel, &location, relpages, reltuples))
-	{
-		/*
-		 * rel is a pxf external table, and it wasn't yet ANALYZE'ed.
-		 */
-
-		float4 tuples, pages;
-		gp_statistics_estimate_reltuples_relpages_external_pxf(rel, &location, &tuples, &pages, NULL);
-		
-		relpages = curpages = pages;
-		reltuples = tuples;
-		pfree(location.data);
-	}	
-	else if (relpages > 0) 
+	if (relpages > 0)
 	{
 
 		/*
@@ -420,7 +403,7 @@ cdb_estimate_rel_size(RelOptInfo   *relOptInfo,
 
 		curpages = relpages;
 	}
-	else /* relpages is 0 and this is a regular table or an external non-PXF table */
+	else /* relpages is 0 and this is a regular table or an external table */
 	{
 
 		/*
@@ -500,24 +483,6 @@ cdb_estimate_rel_size(RelOptInfo   *relOptInfo,
 
 }                               /* cdb_estimate_rel_size */
 
-/* 
- * need_to_get_stats_pxf
- *
- * 1. Table is PXF external table, and
- * 2. ANALYZE was not run on the table, and
- * 3. GUC pxf_enable_stat_collection is on
- */
-static bool need_to_get_stats_pxf(Relation rel,
-								 StringInfo loc, 	
-								 BlockNumber relpages,
-								 double		reltuples)
-{
-	return pxf_enable_stat_collection &&
-		   RelationIsExternalPxfReadOnly(rel, loc) &&
-		   relpages == gp_external_table_default_number_of_pages &&
-		   reltuples == gp_external_table_default_number_of_tuples;
-}
-
 /*
  * estimate_rel_size - estimate # pages and # tuples in a table or index
  *

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 60c48c4..7e7d1b8 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -614,6 +614,7 @@ show_allow_system_table_mods(void);
 /* Extension Framework GUCs */
 bool   pxf_enable_filter_pushdown = true;
 bool   pxf_enable_stat_collection = true;
+int    pxf_stat_max_fragments = 100;
 bool   pxf_enable_locality_optimizations = true;
 bool   pxf_isilon = false; /* temporary GUC */
 int    pxf_service_port = 51200; /* temporary GUC */
@@ -6264,6 +6265,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"pxf_stat_max_fragments", PGC_USERSET, EXTERNAL_TABLES,
+			gettext_noop("Max number of fragments to be sampled during ANALYZE on a PXF table."),
+			NULL,
+			GUC_GPDB_ADDOPT
+		},
+		&pxf_stat_max_fragments,
+		100, 1, INT_MAX, NULL, NULL
+	},
+
+	{
 		{"pxf_service_port", PGC_POSTMASTER, EXTERNAL_TABLES,
 			gettext_noop("PXF service port"),
 			NULL,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/access/hd_work_mgr.h
----------------------------------------------------------------------
diff --git a/src/include/access/hd_work_mgr.h b/src/include/access/hd_work_mgr.h
index ea5c8d6..9280df5 100644
--- a/src/include/access/hd_work_mgr.h
+++ b/src/include/access/hd_work_mgr.h
@@ -20,15 +20,15 @@ extern char** map_hddata_2gp_segments(char *uri, int total_segs, int working_seg
 extern void free_hddata_2gp_segments(char **segs_work_map, int total_segs);
 
 /*
- * Structure that describes one Statistics element received from the PXF service
+ * Structure that describes fragments statistics element received from PXF service
  */
-typedef struct sPxfStatsElem
+typedef struct sPxfFragmentStatsElem
 {
-	int   blockSize; /* size of a block size in the PXF target datasource */
-	int   numBlocks;
-	int   numTuples;
-} PxfStatsElem;
-PxfStatsElem *get_pxf_statistics(char *uri, Relation rel, StringInfo err_msg);
+	int numFrags;
+	float4 firstFragSize; /* size of the first fragment */
+	float4 totalSize; /* size of the total datasource */
+} PxfFragmentStatsElem;
+PxfFragmentStatsElem *get_pxf_fragments_statistics(char *uri, Relation rel);
 
 List *get_pxf_hcat_metadata(char *relation_location);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/access/pxfanalyze.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfanalyze.h b/src/include/access/pxfanalyze.h
new file mode 100644
index 0000000..d641c66
--- /dev/null
+++ b/src/include/access/pxfanalyze.h
@@ -0,0 +1,37 @@
+/*-------------------------------------------------------------------------
+*
+* pxfanalyze.h
+*	  Helper functions to perform ANALYZE on PXF tables.
+*
+* Copyright (c) 2007-2008, Greenplum inc
+*
+*-------------------------------------------------------------------------
+*/
+#ifndef PXFANALYZE_H
+#define PXFANALYZE_H
+
+#include "c.h"
+#include "utils/rel.h"
+#include "nodes/pg_list.h"
+#include "lib/stringinfo.h"
+
+/*
+ * Creates a sample table with data from a PXF table.
+ */
+extern Oid buildPxfSampleTable(Oid relationOid,
+		char* sampleTableName,
+		List *lAttributeNames,
+		float4	relTuples,
+		float4  relFrags,
+		float4 	requestedSampleSize,
+		float4 *sampleTableRelTuples);
+/*
+ * get tuple count estimate, page count estimate (which is
+ * the number of fragments) of a given PXF table.
+ */
+void analyzePxfEstimateReltuplesRelpages(Relation relation,
+		StringInfo location,
+		float4* estimatedRelTuples,
+		float4* estimatedRelPages);
+
+#endif   /* PXFANALYZE_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/access/pxfmasterapi.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfmasterapi.h b/src/include/access/pxfmasterapi.h
index 371d743..020351c 100644
--- a/src/include/access/pxfmasterapi.h
+++ b/src/include/access/pxfmasterapi.h
@@ -55,7 +55,7 @@ typedef struct sFragmentHost
 extern List* get_datanode_rest_servers(GPHDUri *hadoop_uri, ClientContext* client_context);
 extern void free_datanode_rest_servers(List *srvrs);
 extern void free_datanode_rest_server(PxfServer* srv);
-extern PxfStatsElem *get_data_statistics(GPHDUri* hadoop_uri, ClientContext *cl_context, StringInfo err_msg);
+extern PxfFragmentStatsElem *get_fragments_statistics(GPHDUri* hadoop_uri, ClientContext *cl_context);
 extern List* get_data_fragment_list(GPHDUri *hadoop_uri,  ClientContext* client_context);
 extern void free_fragment(DataFragment *fragment);
 extern List* get_hcat_metadata(GPHDUri* hadoop_uri, char *location, ClientContext *client_context);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/access/pxfuriparser.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfuriparser.h b/src/include/access/pxfuriparser.h
index 334deb3..4e239a5 100644
--- a/src/include/access/pxfuriparser.h
+++ b/src/include/access/pxfuriparser.h
@@ -12,7 +12,7 @@
  * All PXF's resources are under /PXF_SERVICE_PREFIX/PXF_VERSION/...
  */
 #define PXF_SERVICE_PREFIX "pxf"
-#define PXF_VERSION "v13" /* PXF version */
+#define PXF_VERSION "v14" /* PXF version */
 
 /*
  * FragmentData - describes a single Hadoop file split / HBase table region

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/cdb/cdbanalyze.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbanalyze.h b/src/include/cdb/cdbanalyze.h
index 7cc2f01..1840c7d 100644
--- a/src/include/cdb/cdbanalyze.h
+++ b/src/include/cdb/cdbanalyze.h
@@ -34,10 +34,4 @@
 extern const int gp_external_table_default_number_of_pages;
 extern const int gp_external_table_default_number_of_tuples;
 
-void gp_statistics_estimate_reltuples_relpages_external_pxf(Relation rel,
-															StringInfo location,
-															float4 *reltuples,
-															float4 *relpages,
-															StringInfo err_msg);
-
 #endif   /* CDBANALYZE_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/commands/analyzeutils.h
----------------------------------------------------------------------
diff --git a/src/include/commands/analyzeutils.h b/src/include/commands/analyzeutils.h
index 0ac3d20..59706a3 100644
--- a/src/include/commands/analyzeutils.h
+++ b/src/include/commands/analyzeutils.h
@@ -12,6 +12,8 @@
 #ifndef ANALYZEUTILS_H
 #define ANALYZEUTILS_H
 
+#include "utils/array.h"
+
 /* extern functions called by commands/analyze.c */
 extern void aggregate_leaf_partition_MCVs(Oid relationOid,
 		AttrNumber attnum,
@@ -23,4 +25,37 @@ extern void aggregate_leaf_partition_histograms(Oid relationOid,
 		ArrayType **result);
 extern bool datumCompare(Datum d1, Datum d2, Oid opFuncOid);
 
+/*
+ * Helper functions for ANALYZE.
+ */
+
+/**
+ * Drops the sample table created during ANALYZE.
+ */
+extern void dropSampleTable(Oid sampleTableOid, bool isExternal);
+
+/**
+ * Generates a table name for the auxiliary sample table that may be created during ANALYZE.
+ */
+extern char* temporarySampleTableName(Oid relationOid, char* prefix);
+
+/* Convenience */
+extern ArrayType * SPIResultToArray(int resultAttributeNumber, MemoryContext allocationContext);
+
+/* spi execution helpers */
+typedef void (*spiCallback)(void *clientDataOut);
+extern void spiExecuteWithCallback(const char *src, bool read_only, long tcount,
+           spiCallback callbackFn, void *clientData);
+
+typedef struct
+{
+    int numColumns;
+    MemoryContext memoryContext;
+    ArrayType ** output;
+} EachResultColumnAsArraySpec;
+
+extern void spiCallback_getEachResultColumnAsArray(void *clientData);
+extern void spiCallback_getProcessedAsFloat4(void *clientData);
+extern void spiCallback_getSingleResultRowColumnAsFloat4(void *clientData);
+
 #endif  /* ANALYZEUTILS_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/include/utils/guc.h
----------------------------------------------------------------------
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 14ed6a2..1623ce8 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -454,6 +454,7 @@ extern char   *gp_snmp_debug_log;
 /* Extension Framework GUCs */
 extern bool   pxf_enable_filter_pushdown; /* turn pushdown logic on/off     */
 extern bool   pxf_enable_stat_collection; /* turn off stats collection if needed */
+extern int    pxf_stat_max_fragments; /* max fragments to be sampled during analyze */
 extern bool   pxf_enable_locality_optimizations; /* turn locality optimization in the data allocation algorithm on/off     */
 /*
  * Is Isilon the target storage system ?



[2/3] incubator-hawq git commit: HAWQ-44. Advanced statistics for PXF tables.

Posted by nh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 06ff72b..a734ade 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -33,6 +33,9 @@ public class ProtocolData extends InputData {
     protected String host;
     protected String profile;
     protected String token;
+    // statistics parameters
+    protected int statsMaxFragments;
+    protected float statsSampleRatio;
 
     /**
      * Constructs a ProtocolData. Parses X-GP-* configuration variables.
@@ -88,6 +91,10 @@ public class ProtocolData extends InputData {
         dataFragment = INVALID_SPLIT_IDX;
         parseDataFragment(getOptionalProperty("DATA-FRAGMENT"));
 
+        statsMaxFragments = 0;
+        statsSampleRatio = 0;
+        parseStatsParameters();
+
         // Store alignment for global use as a system property
         System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
     }
@@ -120,10 +127,8 @@ public class ProtocolData extends InputData {
         this.remoteLogin = copy.remoteLogin;
         this.remoteSecret = copy.remoteSecret;
         this.token = copy.token;
-    }
-
-    public String getToken() {
-        return token;
+        this.statsMaxFragments = copy.statsMaxFragments;
+        this.statsSampleRatio = copy.statsSampleRatio;
     }
 
     /**
@@ -264,6 +269,38 @@ public class ProtocolData extends InputData {
     }
 
     /**
+     * Returns Kerberos token information.
+     *
+     * @return token
+     */
+    public String getToken() {
+        return token;
+    }
+
+    /**
+     * Statistics parameter. Returns the max number of fragments to return for
+     * ANALYZE sampling. The value is set in HAWQ side using the GUC
+     * pxf_stats_max_fragments.
+     *
+     * @return max number of fragments to be processed by analyze
+     */
+    public int getStatsMaxFragments() {
+        return statsMaxFragments;
+    }
+
+    /**
+     * Statistics parameter. Returns a number between 0.0001 and 1.0,
+     * representing the sampling ratio on each fragment for ANALYZE sampling.
+     * The value is set in HAWQ side based on ANALYZE computations and the
+     * number of sampled fragments.
+     *
+     * @return sampling ratio
+     */
+    public float getStatsSampleRatio() {
+        return statsSampleRatio;
+    }
+
+    /**
      * Sets the thread safe parameter. Default value - true.
      */
     private void parseThreadSafe() {
@@ -371,4 +408,34 @@ public class ProtocolData extends InputData {
         remoteLogin = getOptionalProperty("REMOTE-USER");
         remoteSecret = getOptionalProperty("REMOTE-PASS");
     }
+
+    private void parseStatsParameters() {
+
+        String maxFrags = getOptionalProperty("STATS-MAX-FRAGMENTS");
+        if (!StringUtils.isEmpty(maxFrags)) {
+            statsMaxFragments = Integer.parseInt(maxFrags);
+            if (statsMaxFragments <= 0) {
+                throw new IllegalArgumentException("Wrong value '"
+                        + statsMaxFragments + "'. "
+                        + "STATS-MAX-FRAGMENTS must be a positive integer");
+            }
+        }
+
+        String sampleRatioStr = getUserProperty("STATS-SAMPLE-RATIO");
+        if (!StringUtils.isEmpty(sampleRatioStr)) {
+            statsSampleRatio = Float.parseFloat(sampleRatioStr);
+            if (statsSampleRatio < 0.0001 || statsSampleRatio > 1.0) {
+                throw new IllegalArgumentException(
+                        "Wrong value '"
+                                + statsSampleRatio
+                                + "'. "
+                                + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+            }
+        }
+
+        if ((statsSampleRatio > 0) != (statsMaxFragments > 0)) {
+            throw new IllegalArgumentException(
+                    "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
index 137622d..57fb0f4 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/BridgeOutputBuilderTest.java
@@ -2,66 +2,156 @@ package org.apache.hawq.pxf.service;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Test;
-
 import org.apache.hawq.pxf.api.BadRecordException;
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.BufferWritable;
 import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.junit.Test;
 
 public class BridgeOutputBuilderTest {
 
+    /**
+     * Test class to check the data inside BufferWritable.
+     */
+    private class DataOutputToBytes implements DataOutput {
+
+        byte[] output;
+
+        public byte[] getOutput() {
+            return output;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void write(byte[] b) throws IOException {
+            output = b;
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeBoolean(boolean v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeByte(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeShort(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeChar(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeInt(int v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeLong(long v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeFloat(float v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeDouble(double v) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeBytes(String s) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeChars(String s) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void writeUTF(String s) throws IOException {
+            throw new IOException("not implemented");
+        }
+    }
+
     private static final int UN_SUPPORTED_TYPE = -1;
     private GPDBWritable output = null;
+    private DataOutputToBytes dos = new DataOutputToBytes();
 
     @Test
     public void testFillGPDBWritable() throws Exception {
         Map<String, String> parameters = new HashMap<String, String>();
         parameters.put("X-GP-ATTRS", "14");
 
-        addColumn(parameters, 0, DataType.INTEGER,   "col0");
-        addColumn(parameters, 1, DataType.FLOAT8,    "col1");
-        addColumn(parameters, 2, DataType.REAL,      "col2");
-        addColumn(parameters, 3, DataType.BIGINT,    "col3");
-        addColumn(parameters, 4, DataType.SMALLINT,  "col4");
-        addColumn(parameters, 5, DataType.BOOLEAN,   "col5");
-        addColumn(parameters, 6, DataType.BYTEA,     "col6");
-        addColumn(parameters, 7, DataType.VARCHAR,   "col7");
-        addColumn(parameters, 8, DataType.BPCHAR,    "col8");
-        addColumn(parameters, 9, DataType.CHAR,      "col9");
-        addColumn(parameters, 10, DataType.TEXT,     "col10");
-        addColumn(parameters, 11, DataType.NUMERIC,  "col11");
-        addColumn(parameters, 12, DataType.TIMESTAMP,"col12");
-        addColumn(parameters, 13, DataType.DATE,     "col13");
-
+        addColumn(parameters, 0, DataType.INTEGER, "col0");
+        addColumn(parameters, 1, DataType.FLOAT8, "col1");
+        addColumn(parameters, 2, DataType.REAL, "col2");
+        addColumn(parameters, 3, DataType.BIGINT, "col3");
+        addColumn(parameters, 4, DataType.SMALLINT, "col4");
+        addColumn(parameters, 5, DataType.BOOLEAN, "col5");
+        addColumn(parameters, 6, DataType.BYTEA, "col6");
+        addColumn(parameters, 7, DataType.VARCHAR, "col7");
+        addColumn(parameters, 8, DataType.BPCHAR, "col8");
+        addColumn(parameters, 9, DataType.CHAR, "col9");
+        addColumn(parameters, 10, DataType.TEXT, "col10");
+        addColumn(parameters, 11, DataType.NUMERIC, "col11");
+        addColumn(parameters, 12, DataType.TIMESTAMP, "col12");
+        addColumn(parameters, 13, DataType.DATE, "col13");
 
         BridgeOutputBuilder builder = makeBuilder(parameters);
         output = builder.makeGPDBWritableOutput();
 
-        List<OneField> recFields = Arrays.asList(new OneField(DataType.INTEGER.getOID(), 0),
-                new OneField(DataType.FLOAT8.getOID(), (double) 0),
-                new OneField(DataType.REAL.getOID(), (float) 0),
-                new OneField(DataType.BIGINT.getOID(), (long) 0),
-                new OneField(DataType.SMALLINT.getOID(), (short) 0),
-                new OneField(DataType.BOOLEAN.getOID(), true),
-                new OneField(DataType.BYTEA.getOID(), new byte[]{0}),
-                new OneField(DataType.VARCHAR.getOID(), "value"),
-                new OneField(DataType.BPCHAR.getOID(), "value"),
-                new OneField(DataType.CHAR.getOID(), "value"),
-                new OneField(DataType.TEXT.getOID(), "value"),
-                new OneField(DataType.NUMERIC.getOID(), "0"),
-                new OneField(DataType.TIMESTAMP.getOID(), new Timestamp(0)),
+        List<OneField> recFields = Arrays.asList(
+                new OneField(DataType.INTEGER.getOID(), 0), new OneField(
+                        DataType.FLOAT8.getOID(), (double) 0), new OneField(
+                        DataType.REAL.getOID(), (float) 0), new OneField(
+                        DataType.BIGINT.getOID(), (long) 0), new OneField(
+                        DataType.SMALLINT.getOID(), (short) 0), new OneField(
+                        DataType.BOOLEAN.getOID(), true), new OneField(
+                        DataType.BYTEA.getOID(), new byte[] { 0 }),
+                new OneField(DataType.VARCHAR.getOID(), "value"), new OneField(
+                        DataType.BPCHAR.getOID(), "value"), new OneField(
+                        DataType.CHAR.getOID(), "value"), new OneField(
+                        DataType.TEXT.getOID(), "value"), new OneField(
+                        DataType.NUMERIC.getOID(), "0"), new OneField(
+                        DataType.TIMESTAMP.getOID(), new Timestamp(0)),
                 new OneField(DataType.DATE.getOID(), new Date(1)));
         builder.fillGPDBWritable(recFields);
 
@@ -71,30 +161,33 @@ public class BridgeOutputBuilderTest {
         assertEquals(output.getLong(3), Long.valueOf(0));
         assertEquals(output.getShort(4), Short.valueOf((short) 0));
         assertEquals(output.getBoolean(5), true);
-        assertArrayEquals(output.getBytes(6), new byte[]{0});
+        assertArrayEquals(output.getBytes(6), new byte[] { 0 });
         assertEquals(output.getString(7), "value\0");
         assertEquals(output.getString(8), "value\0");
         assertEquals(output.getString(9), "value\0");
         assertEquals(output.getString(10), "value\0");
         assertEquals(output.getString(11), "0\0");
         assertEquals(Timestamp.valueOf(output.getString(12)), new Timestamp(0));
-        assertEquals(Date.valueOf(output.getString(13).trim()).toString(), new Date(1).toString());
+        assertEquals(Date.valueOf(output.getString(13).trim()).toString(),
+                new Date(1).toString());
     }
 
     @Test
     public void testFillOneGPDBWritableField() throws Exception {
         Map<String, String> parameters = new HashMap<String, String>();
-        parameters.put("X-GP-ATTRS", "1");    	
-        addColumn(parameters, 0, DataType.INTEGER, "col0");    	
+        parameters.put("X-GP-ATTRS", "1");
+        addColumn(parameters, 0, DataType.INTEGER, "col0");
         BridgeOutputBuilder builder = makeBuilder(parameters);
         output = builder.makeGPDBWritableOutput();
 
-        OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte((byte) 0));
+        OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte(
+                (byte) 0));
         try {
             builder.fillOneGPDBWritableField(unSupportedField, 0);
             fail("Unsupported data type should throw exception");
         } catch (UnsupportedOperationException e) {
-            assertEquals(e.getMessage(), "Byte is not supported for HAWQ conversion");
+            assertEquals(e.getMessage(),
+                    "Byte is not supported for HAWQ conversion");
         }
     }
 
@@ -113,10 +206,10 @@ public class BridgeOutputBuilderTest {
 
         /* all four fields */
         List<OneField> complete = Arrays.asList(
-                new OneField(DataType.INTEGER.getOID(), 10),
-                new OneField(DataType.INTEGER.getOID(), 20),
-                new OneField(DataType.INTEGER.getOID(), 30),
-                new OneField(DataType.INTEGER.getOID(), 40));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20), new OneField(
+                        DataType.INTEGER.getOID(), 30), new OneField(
+                        DataType.INTEGER.getOID(), 40));
         builder.fillGPDBWritable(complete);
         assertEquals(output.getColType().length, 4);
         assertEquals(output.getInt(0), Integer.valueOf(10));
@@ -126,13 +219,14 @@ public class BridgeOutputBuilderTest {
 
         /* two fields instead of four */
         List<OneField> incomplete = Arrays.asList(
-        		new OneField(DataType.INTEGER.getOID(), 10),
-    			new OneField(DataType.INTEGER.getOID(), 20));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20));
         try {
             builder.fillGPDBWritable(incomplete);
             fail("testRecordBiggerThanSchema should have failed on - Record has 2 fields but the schema size is 4");
         } catch (BadRecordException e) {
-            assertEquals(e.getMessage(), "Record has 2 fields but the schema size is 4");
+            assertEquals(e.getMessage(),
+                    "Record has 2 fields but the schema size is 4");
         }
     }
 
@@ -151,16 +245,17 @@ public class BridgeOutputBuilderTest {
 
         /* five fields instead of four */
         List<OneField> complete = Arrays.asList(
-                new OneField(DataType.INTEGER.getOID(), 10),
-                new OneField(DataType.INTEGER.getOID(), 20),
-                new OneField(DataType.INTEGER.getOID(), 30),
-                new OneField(DataType.INTEGER.getOID(), 40),
-                new OneField(DataType.INTEGER.getOID(), 50));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20), new OneField(
+                        DataType.INTEGER.getOID(), 30), new OneField(
+                        DataType.INTEGER.getOID(), 40), new OneField(
+                        DataType.INTEGER.getOID(), 50));
         try {
             builder.fillGPDBWritable(complete);
             fail("testRecordBiggerThanSchema should have failed on - Record has 5 fields but the schema size is 4");
         } catch (BadRecordException e) {
-            assertEquals(e.getMessage(), "Record has 5 fields but the schema size is 4");
+            assertEquals(e.getMessage(),
+                    "Record has 5 fields but the schema size is 4");
         }
     }
 
@@ -179,25 +274,171 @@ public class BridgeOutputBuilderTest {
 
         /* last field is REAL while schema requires INT */
         List<OneField> complete = Arrays.asList(
-                new OneField(DataType.INTEGER.getOID(), 10),
-                new OneField(DataType.INTEGER.getOID(), 20),
-                new OneField(DataType.INTEGER.getOID(), 30),
-                new OneField(DataType.REAL.getOID(), 40.0));
+                new OneField(DataType.INTEGER.getOID(), 10), new OneField(
+                        DataType.INTEGER.getOID(), 20), new OneField(
+                        DataType.INTEGER.getOID(), 30), new OneField(
+                        DataType.REAL.getOID(), 40.0));
         try {
             builder.fillGPDBWritable(complete);
             fail("testFieldTypeMismatch should have failed on - For field 3 schema requires type INTEGER but input record has type REAL");
         } catch (BadRecordException e) {
-            assertEquals(e.getMessage(), "For field col3 schema requires type INTEGER but input record has type REAL");
+            assertEquals(e.getMessage(),
+                    "For field col3 schema requires type INTEGER but input record has type REAL");
+        }
+    }
+
+    @Test
+    public void convertTextDataToLines() throws Exception {
+
+        String data = "Que sara sara\n" + "Whatever will be will be\n"
+                + "We are going\n" + "to Wembeley!\n";
+        byte[] dataBytes = data.getBytes();
+        String[] dataLines = new String[] {
+                "Que sara sara\n",
+                "Whatever will be will be\n",
+                "We are going\n",
+                "to Wembeley!\n" };
+
+        OneField field = new OneField(DataType.BYTEA.getOID(), dataBytes);
+        List<OneField> fields = new ArrayList<OneField>();
+        fields.add(field);
+
+        Map<String, String> parameters = new HashMap<String, String>();
+        parameters.put("X-GP-ATTRS", "1");
+        addColumn(parameters, 0, DataType.TEXT, "col0");
+        // activate sampling code
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "100");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "1.00");
+
+        BridgeOutputBuilder builder = makeBuilder(parameters);
+        LinkedList<Writable> outputQueue = builder.makeOutput(fields);
+
+        assertEquals(4, outputQueue.size());
+
+        for (int i = 0; i < dataLines.length; ++i) {
+            Writable line = outputQueue.get(i);
+            compareBufferWritable(line, dataLines[i]);
         }
-    }   
 
-    private void addColumn(Map<String, String> parameters, int idx, DataType dataType, String name) {
+        assertNull(builder.getPartialLine());
+    }
+
+    @Test
+    public void convertTextDataToLinesPartial() throws Exception {
+        String data = "oh well\n" + "what the hell";
+
+        OneField field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        List<OneField> fields = new ArrayList<OneField>();
+        fields.add(field);
+
+        Map<String, String> parameters = new HashMap<String, String>();
+        parameters.put("X-GP-ATTRS", "1");
+        addColumn(parameters, 0, DataType.TEXT, "col0");
+        // activate sampling code
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "100");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "1.00");
+
+        BridgeOutputBuilder builder = makeBuilder(parameters);
+        LinkedList<Writable> outputQueue = builder.makeOutput(fields);
+
+        assertEquals(1, outputQueue.size());
+
+        Writable line = outputQueue.get(0);
+        compareBufferWritable(line, "oh well\n");
+
+        Writable partial = builder.getPartialLine();
+        assertNotNull(partial);
+        compareBufferWritable(partial, "what the hell");
+
+        // check that append works
+        data = " but the show must go on\n" + "!!!\n";
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertNull(builder.getPartialLine());
+        assertEquals(2, outputQueue.size());
+
+        line = outputQueue.get(0);
+        compareBufferWritable(line, "what the hell but the show must go on\n");
+        line = outputQueue.get(1);
+        compareBufferWritable(line, "!!!\n");
+
+        // check that several partial lines gets appended to each other
+        data = "I want to ride my bicycle\n" + "I want to ride my bike";
+
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertEquals(1, outputQueue.size());
+
+        line = outputQueue.get(0);
+        compareBufferWritable(line, "I want to ride my bicycle\n");
+
+        partial = builder.getPartialLine();
+        assertNotNull(partial);
+        compareBufferWritable(partial, "I want to ride my bike");
+
+        // data consisting of one long line
+        data = " I want to ride my bicycle";
+
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertEquals(0, outputQueue.size());
+
+        partial = builder.getPartialLine();
+        assertNotNull(partial);
+        compareBufferWritable(partial,
+                "I want to ride my bike I want to ride my bicycle");
+
+        // data with lines
+        data = " bicycle BICYCLE\n" + "bicycle BICYCLE\n";
+
+        field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
+        fields.clear();
+        fields.add(field);
+
+        outputQueue = builder.makeOutput(fields);
+
+        assertEquals(2, outputQueue.size());
+
+        line = outputQueue.get(0);
+        compareBufferWritable(line,
+                "I want to ride my bike I want to ride my bicycle bicycle BICYCLE\n");
+        line = outputQueue.get(1);
+        compareBufferWritable(line, "bicycle BICYCLE\n");
+
+        partial = builder.getPartialLine();
+        assertNull(partial);
+
+    }
+
+    private void compareBufferWritable(Writable line, String expected)
+            throws IOException {
+        assertTrue(line instanceof BufferWritable);
+        line.write(dos);
+        assertArrayEquals(expected.getBytes(), dos.getOutput());
+    }
+
+    private void addColumn(Map<String, String> parameters, int idx,
+                           DataType dataType, String name) {
         parameters.put("X-GP-ATTR-NAME" + idx, name);
-        parameters.put("X-GP-ATTR-TYPECODE" + idx, Integer.toString(dataType.getOID()));
+        parameters.put("X-GP-ATTR-TYPECODE" + idx,
+                Integer.toString(dataType.getOID()));
         parameters.put("X-GP-ATTR-TYPENAME" + idx, dataType.toString());
     }
 
-    private BridgeOutputBuilder makeBuilder(Map<String, String> parameters) throws Exception {
+    private BridgeOutputBuilder makeBuilder(Map<String, String> parameters)
+            throws Exception {
 
         parameters.put("X-GP-ALIGNMENT", "8");
         parameters.put("X-GP-SEGMENT-ID", "-44");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
new file mode 100644
index 0000000..b5d4f9a
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/ReadSamplingBridgeTest.java
@@ -0,0 +1,225 @@
+package org.apache.hawq.pxf.service;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.ReadSamplingBridge;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ AnalyzeUtils.class, ReadSamplingBridge.class })
+public class ReadSamplingBridgeTest {
+
+    /**
+     * Writable test object to test ReadSamplingBridge. The object receives a
+     * string and returns it in its toString function.
+     */
+    public class WritableTest implements Writable {
+
+        private String data;
+
+        public WritableTest(String data) {
+            this.data = data;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            throw new IOException("not implemented");
+        }
+
+        @Override
+        public String toString() {
+            return data;
+        }
+
+    }
+
+    private ProtocolData mockProtData;
+    private ReadBridge mockBridge;
+    private ReadSamplingBridge readSamplingBridge;
+    private int recordsLimit = 0;
+    private BitSet samplingBitSet;
+    private Writable result;
+
+    @Test
+    public void getNextRecord100Percent() throws Exception {
+
+        samplingBitSet.set(0, 100);
+        recordsLimit = 100;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 1.0);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        result = readSamplingBridge.getNext();
+        assertEquals("0", result.toString());
+
+        result = readSamplingBridge.getNext();
+        assertEquals("1", result.toString());
+
+        when(mockBridge.getNext()).thenReturn(null);
+
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord100Records10Percent() throws Exception {
+
+        // set 10 bits from 5 to 14.
+        samplingBitSet.set(5, 15);
+        recordsLimit = 100;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.1);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        for (int i = 0; i < 10; i++) {
+            result = readSamplingBridge.getNext();
+            assertEquals("" + (i + 5), result.toString());
+        }
+
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord100Records90Percent() throws Exception {
+        int expected = 0;
+
+        // set the first odd numbers until 20, then all numbers until 100
+        // total: 90.
+        samplingBitSet.set(0, 100);
+        for (int i = 0; i < 10; i++) {
+            samplingBitSet.flip(i * 2);
+        }
+        recordsLimit = 100;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.9);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        for (int i = 0; i < 90; i++) {
+            result = readSamplingBridge.getNext();
+            if (i < 10) {
+                expected = (i * 2) + 1;
+            } else {
+                expected = i + 10;
+            }
+            assertEquals("" + expected, result.toString());
+        }
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord350Records50Percent() throws Exception {
+
+        // set bits 0, 40-79 (40) and 90-98 (9)
+        // total 50.
+        samplingBitSet.set(0);
+        samplingBitSet.set(40, 80);
+        samplingBitSet.set(90, 99);
+        recordsLimit = 350;
+        when(mockProtData.getStatsSampleRatio()).thenReturn((float) 0.5);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        /*
+         * expecting to have: 50 (out of first 100) 50 (out of second 100) 50
+         * (out of third 100) 11 (out of last 50) --- 161 records
+         */
+        for (int i = 0; i < 161; i++) {
+            result = readSamplingBridge.getNext();
+            assertNotNull(result);
+            if (i % 50 == 0) {
+                assertEquals("" + (i * 2), result.toString());
+            }
+        }
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Test
+    public void getNextRecord100000Records30Sample() throws Exception {
+        int expected = 0;
+
+        // ratio = 0.0003
+        float ratio = (float) (30.0 / 100000.0);
+
+        // set 3 records in every 10000.
+        samplingBitSet.set(99);
+        samplingBitSet.set(999);
+        samplingBitSet.set(9999);
+        recordsLimit = 100000;
+        when(mockProtData.getStatsSampleRatio()).thenReturn(ratio);
+
+        readSamplingBridge = new ReadSamplingBridge(mockProtData);
+
+        for (int i = 0; i < 30; i++) {
+            result = readSamplingBridge.getNext();
+            assertNotNull(result);
+            int residue = i % 3;
+            int div = i / 3;
+            if (residue == 0) {
+                expected = 99 + (div * 10000);
+            } else if (residue == 1) {
+                expected = 999 + (div * 10000);
+            } else {
+                expected = 9999 + (div * 10000);
+            }
+            assertEquals("" + expected, result.toString());
+        }
+        result = readSamplingBridge.getNext();
+        assertNull(result);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        mockProtData = mock(ProtocolData.class);
+
+        mockBridge = mock(ReadBridge.class);
+        PowerMockito.whenNew(ReadBridge.class).withAnyArguments().thenReturn(
+                mockBridge);
+
+        when(mockBridge.getNext()).thenAnswer(new Answer<Writable>() {
+            private int count = 0;
+
+            @Override
+            public Writable answer(InvocationOnMock invocation)
+                    throws Throwable {
+                if (count >= recordsLimit) {
+                    return null;
+                }
+                return new WritableTest("" + (count++));
+            }
+        });
+
+        PowerMockito.mockStatic(AnalyzeUtils.class);
+        samplingBitSet = new BitSet();
+        when(
+                AnalyzeUtils.generateSamplingBitSet(any(int.class),
+                        any(int.class))).thenReturn(samplingBitSet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
new file mode 100644
index 0000000..984fcf6
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/io/BufferWritableTest.java
@@ -0,0 +1,22 @@
+package org.apache.hawq.pxf.service.io;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class BufferWritableTest {
+
+    @Test
+    public void append() throws Exception {
+        String data1 = "פרק ראשון ובו יסופר יסופר";
+        String data2 = "פרק שני ובו יסופר יסופר";
+
+        BufferWritable bw1 = new BufferWritable(data1.getBytes());
+
+        assertArrayEquals(data1.getBytes(), bw1.buf);
+
+        bw1.append(data2.getBytes());
+
+        assertArrayEquals((data1+data2).getBytes(), bw1.buf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
new file mode 100644
index 0000000..39bb266
--- /dev/null
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtilsTest.java
@@ -0,0 +1,117 @@
+package org.apache.hawq.pxf.service.utilities;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+public class AnalyzeUtilsTest {
+
+    @Test
+    public void generateSamplingBitSet() throws Exception {
+        runGenerateSamplingBitSetTest(10, 5, new int[]{0, 3, 4, 6, 9});
+
+        runGenerateSamplingBitSetTest(9, 8, new int[] {0, 2, 3, 4, 5, 6, 7, 8});
+
+        runGenerateSamplingBitSetTest(10, 10, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+        runGenerateSamplingBitSetTest(8, 0, new int[]{});
+
+        runGenerateSamplingBitSetTest(8, 3, new int[]{0, 3, 6});
+    }
+
+    @Test
+    public void generateSamplingBitSetBig() throws Exception {
+        BitSet result = AnalyzeUtils.generateSamplingBitSet(1000000, 990000);
+        assertEquals(result.cardinality(), 990000);
+        assertTrue(result.length() < 1000000);
+
+        result = AnalyzeUtils.generateSamplingBitSet(1000000000, 5000000);
+        assertEquals(result.cardinality(), 5000000);
+        assertTrue(result.length() < 1000000000);
+    }
+
+    @Test
+    public void getSampleFragments() throws Exception {
+        // fragments less than threshold
+        runGetSampleFragmentsTest(4, 100, 4, new int[] {0, 1, 2, 3});
+
+        // fragments over threshold
+        runGetSampleFragmentsTest(4, 2, 2, new int[]{0, 3});
+        runGetSampleFragmentsTest(10, 2, 2, new int[]{0, 6});
+        runGetSampleFragmentsTest(10, 3, 3, new int[]{0, 4, 8});
+        runGetSampleFragmentsTest(10, 9, 9, new int[]{0, 1, 2, 4, 5, 6, 7, 8, 9 });
+        runGetSampleFragmentsTest(15, 10, 10, new int[]{0, 2, 3, 4, 6, 7, 8, 10, 12, 14});
+        runGetSampleFragmentsTest(1000, 10, 10,
+                new int[]{0, 101, 202, 303, 404, 505, 606, 707, 808, 909});
+        runGetSampleFragmentsTest(100, 65, 65,
+                new int[]{0, 1, 2, 4, 5, 6, 8, 9, 10,       /* 9 elements */
+                          12, 13, 14, 16, 17, 18,           /* 6 elements */
+                          20, 21, 22, 24, 25, 26, 28, 29,   /* 8 elements */
+                          30, 32, 33, 34, 36, 37, 38,       /* 7 elements */
+                          40, 41, 42, 44, 45, 46, 48, 49,   /* 8 elements */
+                          50, 52, 53, 54, 56, 57, 58,       /* 7 elements */
+                          60, 62, 64, 66, 68,               /* 5 elements */
+                          70, 72, 74, 76, 78,               /* 5 elements */
+                          80, 82, 84, 86, 88,               /* 5 elements */
+                          90, 92, 94, 96, 98                /* 5 elements */
+                          });
+                                                            /* => 65 elements */
+        // threshold illegal and ignored
+        runGetSampleFragmentsTest(10, 0, 10, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+    }
+
+    private void runGenerateSamplingBitSetTest(int poolSize, int sampleSize, int[] expectedIndexes) throws Exception {
+        BitSet expected = new BitSet();
+        for (int i: expectedIndexes) {
+            expected.set(i);
+        }
+        BitSet result = AnalyzeUtils.generateSamplingBitSet(poolSize, sampleSize);
+
+        Assert.assertEquals(expected, result);
+    }
+
+    private void runGetSampleFragmentsTest(int inputSize, int maxFragments, int expectedSize, int[] expectedIndexes) throws Exception {
+        ProtocolData mockProtocolData = mock(ProtocolData.class);
+        when(mockProtocolData.getStatsMaxFragments()).thenReturn(maxFragments);
+
+        List<Fragment> fragments = new ArrayList<Fragment>();
+
+        for (int i = 0; i < inputSize; i++) {
+            fragments.add(prepareFragment(i));
+        }
+        assertEquals(inputSize, fragments.size());
+
+        List<Fragment> result = AnalyzeUtils.getSampleFragments(fragments, mockProtocolData);
+
+        List<Fragment> expected = new ArrayList<Fragment>();
+
+        for (int i: expectedIndexes) {
+            expected.add(prepareFragment(i));
+        }
+
+        assertEquals("verify number of returned fragments", expectedSize, result.size());
+
+        for (int i = 0; i < expectedSize; i++) {
+            Assert.assertEquals("compare fragment #" + i, expected.get(i).getIndex(), result.get(i).getIndex());
+        }
+    }
+
+    private Fragment prepareFragment(int i) {
+        Fragment fragment = new Fragment("fragment" + i, null, null);
+        fragment.setIndex(i);
+        return fragment;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
index 194f148..bd31b19 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/utilities/ProtocolDataTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({UserGroupInformation.class, ProfilesConf.class})
+@PrepareForTest({ UserGroupInformation.class, ProfilesConf.class })
 public class ProtocolDataTest {
     Map<String, String> parameters;
 
@@ -47,7 +47,8 @@ public class ProtocolDataTest {
         assertEquals(protocolData.getAccessor(), "are");
         assertEquals(protocolData.getResolver(), "packed");
         assertEquals(protocolData.getDataSource(), "i'm/ready/to/go");
-        assertEquals(protocolData.getUserProperty("i'm-standing-here"), "outside-your-door");
+        assertEquals(protocolData.getUserProperty("i'm-standing-here"),
+                "outside-your-door");
         assertEquals(protocolData.getParametersMap(), parameters);
         assertNull(protocolData.getLogin());
         assertNull(protocolData.getSecret());
@@ -66,10 +67,12 @@ public class ProtocolDataTest {
 
         Map<String, String> mockedProfiles = new HashMap<>();
         mockedProfiles.put("wHEn you trY yOUR bESt", "but you dont succeed");
-        mockedProfiles.put("when YOU get WHAT you WANT", "but not what you need");
+        mockedProfiles.put("when YOU get WHAT you WANT",
+                "but not what you need");
         mockedProfiles.put("when you feel so tired", "but you cant sleep");
 
-        when(ProfilesConf.getProfilePluginsMap("a profile")).thenReturn(mockedProfiles);
+        when(ProfilesConf.getProfilePluginsMap("a profile")).thenReturn(
+                mockedProfiles);
 
         parameters.put("x-gp-profile", "a profile");
         parameters.put("when you try your best", "and you do succeed");
@@ -79,7 +82,8 @@ public class ProtocolDataTest {
             new ProtocolData(parameters);
             fail("Duplicate property should throw IllegalArgumentException");
         } catch (IllegalArgumentException iae) {
-            assertEquals("Profile 'a profile' already defines: [when YOU get WHAT you WANT, wHEn you trY yOUR bESt]",
+            assertEquals(
+                    "Profile 'a profile' already defines: [when YOU get WHAT you WANT, wHEn you trY yOUR bESt]",
                     iae.getMessage());
         }
     }
@@ -169,8 +173,9 @@ public class ProtocolDataTest {
             new ProtocolData(parameters);
             fail("should fail with bad fragment metadata");
         } catch (Exception e) {
-            assertEquals(e.getMessage(), "Fragment metadata information must be Base64 encoded." +
-                    "(Bad value: " + badValue + ")");
+            assertEquals(e.getMessage(),
+                    "Fragment metadata information must be Base64 encoded."
+                            + "(Bad value: " + badValue + ")");
         }
     }
 
@@ -197,6 +202,116 @@ public class ProtocolDataTest {
         assertEquals("UTF8_計算機用語_00000000", protocolData.getFilterString());
     }
 
+    @Test
+    public void noStatsParams() {
+        ProtocolData protData = new ProtocolData(parameters);
+
+        assertEquals(0, protData.getStatsMaxFragments());
+        assertEquals(0, protData.getStatsSampleRatio(), 0.1);
+    }
+
+    @Test
+    public void statsParams() {
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "10101");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "0.039");
+
+        ProtocolData protData = new ProtocolData(parameters);
+
+        assertEquals(10101, protData.getStatsMaxFragments());
+        assertEquals(0.039, protData.getStatsSampleRatio(), 0.01);
+    }
+
+    @Test
+    public void statsMissingParams() {
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "13");
+        try {
+            new ProtocolData(parameters);
+            fail("missing X-GP-STATS-SAMPLE-RATIO parameter");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+        }
+
+        parameters.remove("X-GP-STATS-MAX-FRAGMENTS");
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "1");
+        try {
+            new ProtocolData(parameters);
+            fail("missing X-GP-STATS-MAX-FRAGMENTS parameter");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together");
+        }
+    }
+
+    @Test
+    public void statsSampleRatioNegative() {
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "101");
+
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Wrong value '101.0'. "
+                            + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+        }
+
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "0");
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Wrong value '0.0'. "
+                            + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+        }
+
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "0.00005");
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(
+                    e.getMessage(),
+                    "Wrong value '5.0E-5'. "
+                            + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0");
+        }
+
+        parameters.put("X-GP-STATS-SAMPLE-RATIO", "a");
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-SAMPLE-RATIO value");
+        } catch (NumberFormatException e) {
+            assertEquals(e.getMessage(), "For input string: \"a\"");
+        }
+    }
+
+    @Test
+    public void statsMaxFragmentsNegative() {
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "10.101");
+
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-MAX-FRAGMENTS value");
+        } catch (NumberFormatException e) {
+            assertEquals(e.getMessage(), "For input string: \"10.101\"");
+        }
+
+        parameters.put("X-GP-STATS-MAX-FRAGMENTS", "0");
+
+        try {
+            new ProtocolData(parameters);
+            fail("wrong X-GP-STATS-MAX-FRAGMENTS value");
+        } catch (IllegalArgumentException e) {
+            assertEquals(e.getMessage(), "Wrong value '0'. "
+                    + "STATS-MAX-FRAGMENTS must be a positive integer");
+        }
+    }
+
     /*
      * setUp function called before each test
      */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/Makefile b/src/backend/access/external/Makefile
index afc3734..fec5a34 100644
--- a/src/backend/access/external/Makefile
+++ b/src/backend/access/external/Makefile
@@ -10,7 +10,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = fileam.o url.o libchurl.o hd_work_mgr.o pxfuriparser.o pxfheaders.o \
-pxfmasterapi.o ha_config.o pxfcomutils.o pxfutils.o pxffilters.o
+pxfmasterapi.o ha_config.o pxfcomutils.o pxfutils.o pxffilters.o pxfanalyze.o
 
 include $(top_srcdir)/src/backend/common.mk
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c
index 36f11a8..6aa0736 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -233,51 +233,41 @@ static void assign_pxf_port_to_fragments(int remote_rest_port, List *fragments)
 }
 
 /*
- * Fetches statistics of the PXF datasource from the PXF service
+ * Fetches fragments statistics of the PXF datasource from the PXF service
  *
- * The function will generate a delegation token when secure filesystem mode 
+ * The function will generate a delegation token when secure filesystem mode
  * is on and cancel it right after.
  */
-PxfStatsElem *get_pxf_statistics(char *uri, Relation rel, StringInfo err_msg)
+PxfFragmentStatsElem *get_pxf_fragments_statistics(char *uri, Relation rel)
 {
 	ClientContext client_context; /* holds the communication info */
 	char *analyzer = NULL;
 	char *profile = NULL;
 	PxfInputData inputData = {0};
-	PxfStatsElem *result = NULL;
-	
+	PxfFragmentStatsElem *result = NULL;
+
 	GPHDUri* hadoop_uri = init(uri, &client_context);
 	if (!hadoop_uri)
-		return NULL;
-
-	/*
-	 * Get the statistics info from REST only if analyzer is defined
-     */
-	if(GPHDUri_get_value_for_opt(hadoop_uri, "analyzer", &analyzer, false) != 0 &&
-	   GPHDUri_get_value_for_opt(hadoop_uri, "profile", &profile, false) != 0)
 	{
-		if (err_msg)
-			appendStringInfo(err_msg, "no ANALYZER or PROFILE option in table definition");
-		return NULL;
+		elog(ERROR, "Failed to parse PXF location %s", uri);
 	}
-	
+
 	/*
 	 * Enrich the curl HTTP header
 	 */
 	inputData.headers = client_context.http_headers;
 	inputData.gphduri = hadoop_uri;
-	inputData.rel = rel; 
-	inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */	
+	inputData.rel = rel;
+	inputData.filterstr = NULL; /* We do not supply filter data to the HTTP header */
     generate_delegation_token(&inputData);
 	build_http_header(&inputData);
-	
-	result = get_data_statistics(hadoop_uri, &client_context, err_msg);
+
+	result = get_fragments_statistics(hadoop_uri, &client_context);
 
 	cancel_delegation_token(&inputData);
 	return result;
 }
 
-
 /*
  * Preliminary uri parsing and curl initializations for the REST communication
  */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/pxfanalyze.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfanalyze.c b/src/backend/access/external/pxfanalyze.c
new file mode 100644
index 0000000..eb5a17a
--- /dev/null
+++ b/src/backend/access/external/pxfanalyze.c
@@ -0,0 +1,740 @@
+/*-------------------------------------------------------------------------
+ *
+ * pxfanalyze.c
+ *	  Helper functions to perform ANALYZE on PXF tables.
+ *
+ * Copyright (c) 2007-2012, Greenplum inc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include <curl/curl.h>
+#include <json/json.h>
+#include "access/hd_work_mgr.h"
+#include "access/pxfanalyze.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_exttable.h"
+#include "cdb/cdbanalyze.h"
+#include "commands/analyzeutils.h"
+#include "lib/stringinfo.h"
+#include "nodes/makefuncs.h"
+#include "utils/builtins.h"
+#include "utils/elog.h"
+#include "utils/guc.h"
+#include "utils/lsyscache.h"
+
+
+static void buildPxfTableCopy(Oid relationOid,
+		float4	samplingRatio,
+		int pxfStatMaxFragments,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable);
+static void buildSampleFromPxf(const char* sampleSchemaName,
+		const char* sampleTableName,
+		const char* pxfSampleTable,
+		List *lAttributeNames,
+		float4 *sampleTableRelTuples);
+
+static float4 calculateSamplingRatio(float4 relTuples,
+									 float4 relFrags,
+									 float4 requestedSampleSize);
+
+static char* parseFormat(char fmtcode);
+static char* escape_unprintables(const char *src);
+static char* escape_fmtopts_string(const char *src);
+static char* custom_fmtopts_string(const char *src);
+static void printExtTable(Oid relationOid, ExtTableEntry* extTable);
+static char* createPxfSampleStmt(Oid relationOid,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable,
+		float4 pxf_sample_ratio, int pxf_max_fragments);
+static float4 getPxfFragmentTupleCount(Oid relationOid);
+static float4 countFirstFragmentTuples(const char* schemaName,
+									   const char* tableName);
+static void getFragmentStats(Relation rel, StringInfo location,
+							 float4 *numfrags, float4 *firstfragsize,
+							 float4 *totalsize);
+
+
+void analyzePxfEstimateReltuplesRelpages(Relation relation,
+		StringInfo location,
+		float4* estimatedRelTuples,
+		float4* estimatedRelPages)
+{
+
+	float4 numFrags = 0.0;
+	float4 firstFragSize = 0.0;
+	float4 totalSize = 0.0;
+
+	float4 firstFragTuples = 0.0;
+	float4 estimatedTuples = 0.0;
+
+	/* get number of fragments, size of first fragment and total size.
+	 * This is used together with the number of tuples in first fragment
+	 * to estimate the number of tuples in the table. */
+	getFragmentStats(relation, location, &numFrags, &firstFragSize, &totalSize);
+
+	/* get number of tuples from first fragment */
+	firstFragTuples = getPxfFragmentTupleCount(relation->rd_id);
+
+	/* calculate estimated tuple count */
+	if (firstFragTuples > 0)
+	{
+		Assert(firstFragSize > 0);
+		Assert(totalSize > 0);
+		/* The calculation:
+		 * size of each tuple = first fragment size / first fragment row
+		 * total size = size of each tuple * number of tuples
+		 * number of tuples = total size / size of each tuple
+		 */
+		estimatedTuples = (totalSize / firstFragSize) * firstFragTuples;
+	}
+
+	elog(DEBUG2, "Estimated tuples for PXF table: %f. (first fragment count %f, fragments number %f, old estimate %f)",
+		 estimatedTuples, firstFragTuples, numFrags, *estimatedRelTuples);
+
+	*estimatedRelTuples = estimatedTuples;
+	*estimatedRelPages = numFrags;
+
+	/* relpages can't be 0 if there are tuples in the table. */
+	if ((*estimatedRelPages < 1.0) && (estimatedTuples > 0))
+	{
+		*estimatedRelPages = 1.0;
+	}
+
+	/* in case there were problems with the PXF service, keep the defaults */
+	if (*estimatedRelPages < 0)
+	{
+		*estimatedRelPages =  gp_external_table_default_number_of_pages;
+	}
+	if (*estimatedRelTuples < 0)
+	{
+		*estimatedRelTuples =  gp_external_table_default_number_of_tuples;
+	}
+}
+
+/*
+ * Creates a sample table with data from a PXF table.
+ * We need to create a copy of the PXF table, in order to pass the sampling
+ * parameters pxf_sample_ratio and pxf_max_fragments as attributes,
+ * and to create a segment reject limit of 25 percent.
+ *
+ * The new PXF table is sampled and the results are saved in the returned sample table.
+ * Note that ANALYZE can be executed only by the database owner.
+ * It is safe to assume that the database owner has permissions to create temp tables.
+ * The sampling is done by uniformly sampling pxf_sample_ratio records of each fragments,
+ * up to pxf_max_fragments.
+ *
+ * Input:
+ * 	relationOid 	- relation to be sampled
+ * 	sampleTableName - sample table name, moderately unique
+ * 	lAttributeNames - attributes to be included in the sample
+ * 	relTuples		- estimated size of relation
+ * 	relFrags		- estimated number of fragments in relation
+ * 	requestedSampleSize - as determined by attribute statistics requirements.
+ * 	sampleTableRelTuples	- limit on size of the sample.
+ * Output:
+ * 	sampleTableRelTuples - number of tuples in the sample table created.
+ */
+Oid buildPxfSampleTable(Oid relationOid,
+		char* sampleTableName,
+		List *lAttributeNames,
+		float4	relTuples,
+		float4  relFrags,
+		float4 	requestedSampleSize,
+		float4 *sampleTableRelTuples)
+{
+	const char *schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+	const char *tableName = get_rel_name(relationOid); /* must be pfreed */
+	char	*sampleSchemaName = pstrdup("pg_temp");
+	char	*pxfSampleTable = temporarySampleTableName(relationOid, "pg_analyze_pxf"); /* must be pfreed */
+	Oid			sampleTableOid = InvalidOid;
+	Oid			pxfSampleTableOid = InvalidOid;
+	RangeVar 	*rangeVar = NULL;
+	float4 pxfSamplingRatio = 0.0;
+
+	Assert(requestedSampleSize > 0.0);
+	Assert(relTuples > 0.0);
+	Assert(relFrags > 0.0);
+
+	/* calculate pxf_sample_ratio */
+	pxfSamplingRatio = calculateSamplingRatio(relTuples, relFrags, requestedSampleSize);
+
+	/* build copy of original pxf table */
+	buildPxfTableCopy(relationOid,
+					  pxfSamplingRatio,
+					  pxf_stat_max_fragments,
+					  schemaName, tableName,
+					  sampleSchemaName, pxfSampleTable);
+
+	rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, pxfSampleTable, -1);
+	pxfSampleTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+	buildSampleFromPxf(sampleSchemaName, sampleTableName, pxfSampleTable,
+					   lAttributeNames, sampleTableRelTuples);
+
+	rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, sampleTableName, -1);
+	sampleTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+	Assert(sampleTableOid != InvalidOid);
+
+	/**
+	 * MPP-10723: Very rarely, we may be unlucky and generate an empty sample table. We error out in this case rather than
+	 * generate bad statistics.
+	 */
+
+	if (*sampleTableRelTuples < 1.0)
+	{
+		elog(ERROR, "ANALYZE unable to generate accurate statistics on table %s.%s. Try lowering gp_analyze_relative_error",
+				quote_identifier(schemaName),
+				quote_identifier(tableName));
+	}
+
+	if (pxfSampleTableOid != InvalidOid)
+	{
+		elog(DEBUG2, "ANALYZE dropping PXF sample table");
+		dropSampleTable(pxfSampleTableOid, true);
+	}
+
+	pfree((void *) rangeVar);
+	pfree((void *) pxfSampleTable);
+	pfree((void *) tableName);
+	pfree((void *) schemaName);
+	pfree((void *) sampleSchemaName);
+	return sampleTableOid;
+}
+
+/*
+ * Creates an external PXF table, with the same properties
+ * as the given PXF table to be sampled, other than additional
+ * 2 attributes in the location clause -
+ * pxf_stats_sample_ratio and pxf_stats_max_fragments,
+ * and a segment reject limit of 25 percent.
+ */
+static void buildPxfTableCopy(Oid relationOid,
+		float4 samplingRatio,
+		int pxfStatMaxFragments,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable)
+{
+
+	/* create table string */
+	char* createPxfSampleStr = createPxfSampleStmt(relationOid,
+			schemaName, tableName,
+			sampleSchemaName, pxfSampleTable,
+			samplingRatio, pxfStatMaxFragments);
+
+	spiExecuteWithCallback(createPxfSampleStr, false /*readonly*/, 0 /*tcount */,
+			NULL, NULL);
+
+	pfree(createPxfSampleStr);
+
+	elog(DEBUG2, "Created PXF table %s.%s for sampling PXF table %s.%s",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(pxfSampleTable),
+			quote_identifier(schemaName),
+			quote_identifier(tableName));
+}
+
+/*
+ * Creates and populates a sample table for a PXF table.
+ * The actual queried table is not the original PXF table but a copy of it
+ * with additional attributes to enable sampling.
+ *
+ * The results are stored in sampleTableRelTuples.
+ */
+static void buildSampleFromPxf(const char* sampleSchemaName,
+		const char* sampleTableName,
+		const char* pxfSampleTable,
+		List *lAttributeNames,
+		float4 *sampleTableRelTuples)
+{
+	int nAttributes = -1;
+	int i = 0;
+	ListCell *le = NULL;
+	StringInfoData str;
+
+	initStringInfo(&str);
+
+	appendStringInfo(&str, "create table %s.%s as (select ",
+			quote_identifier(sampleSchemaName), quote_identifier(sampleTableName));
+
+	nAttributes = list_length(lAttributeNames);
+
+	foreach_with_count(le, lAttributeNames, i)
+	{
+		appendStringInfo(&str, "Ta.%s", quote_identifier((const char *) lfirst(le)));
+		if (i < nAttributes - 1)
+		{
+			appendStringInfo(&str, ", ");
+		}
+		else
+		{
+			appendStringInfo(&str, " ");
+		}
+	}
+
+	appendStringInfo(&str, "from %s.%s as Ta) distributed randomly",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(pxfSampleTable));
+
+	/* in case of PXF error, analyze on this table will reverted */
+	spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+			spiCallback_getProcessedAsFloat4, sampleTableRelTuples);
+
+	pfree(str.data);
+
+	elog(DEBUG2, "Created sample table %s.%s with nrows=%.0f",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(sampleTableName),
+			*sampleTableRelTuples);
+}
+
+/*
+ * Returns a sampling ratio - a fraction between 1.0 and 0.0001
+ * representing how many samples should be returned from each fragment
+ * of a PXF table.
+ * The ratio is calculated based on the tuples estimate of the table
+ * and on the number of the actually sampled fragments
+ * (GUC pxf_stat_max_fragments), by the following formula:
+ * ratio = (<sample size> / <tuples estimate>) * (<total # fragments> / <fragments to be sampled>)
+ * If the ratio is too big or small, it is corrected to 1.0 or 0.0001 respectively.
+ *
+ * Input:
+ * 	relTuples		- number of tuples in the table
+ * 	relFrags		- number of fragments in the table
+ * 	requestedSampleSize - number of sample tuples required
+ * Output:
+ * 	the sampling ratio for the table.
+ */
+static float4 calculateSamplingRatio(float4 relTuples,
+		 float4 relFrags,
+		 float4 requestedSampleSize)
+{
+	float4 sampleRatio = 0.0;
+
+	Assert(relFrags > 0);
+	Assert(relTuples > 0);
+	Assert(requestedSampleSize > 0);
+
+	/* sample ratio for regular tables */
+	sampleRatio = requestedSampleSize / relTuples;
+
+	if (pxf_stat_max_fragments < relFrags)
+	{
+		/*
+		 * Correct ratio according to the number of sampled fragments.
+		 * If there are less fragments to sample, the ratio should be increased.
+		 * If the corrected sampling ratio is > 100%, make it 100%
+		 */
+		sampleRatio = sampleRatio * (relFrags / pxf_stat_max_fragments);
+		if (sampleRatio > 1.0)
+		{
+			sampleRatio = 1.0;
+		}
+	}
+
+	/*
+	 * If the ratio is too low (< 0.0001), correct it to 0.0001.
+	 * That means that the lowest rate we will get is 1 tuple per 10,000.
+	 */
+	if (sampleRatio < 0.0001)
+	{
+		sampleRatio = 0.0001;
+	}
+
+	elog(DEBUG2, "PXF ANALYZE: pxf_stats_sample_ratio = %f, pxf_stats_max_fragments = %d, table fragments = %f",
+		 sampleRatio, pxf_stat_max_fragments, relFrags);
+	return sampleRatio;
+}
+
+static char* parseFormat(char fmtcode)
+{
+	if (fmttype_is_custom(fmtcode))
+		return "CUSTOM";
+	if (fmttype_is_text(fmtcode))
+		return "TEXT";
+	if (fmttype_is_csv(fmtcode))
+		return "CSV";
+
+	elog(ERROR, "Unrecognized external table format '%c'", fmtcode);
+	return NULL;
+}
+
+/* Helper functions from dumputils.c, modified to backend (malloc->palloc) */
+
+/*
+ * Escape any unprintables (0x00 - 0x1F) in given string
+ */
+char *
+escape_unprintables(const char *src)
+{
+	int			len = strlen(src),
+				i,
+				j;
+	char	   *result = palloc0(len * 4 + 1);
+	if (!result)
+		return NULL; /* out of memory */
+
+	for (i = 0, j = 0; i < len; i++)
+	{
+		if ((src[i] <= '\x1F') && (src[i] != '\x09' /* TAB */))
+		{
+			snprintf(&(result[j]), 5, "\\x%02X", src[i]);
+			j += 4;
+		}
+		else
+			result[j++] = src[i];
+	}
+	result[j] = '\0';
+	return result;
+}
+
+/*
+ * Escape backslashes and apostrophes in EXTERNAL TABLE format strings.
+ *
+ * The fmtopts field of a pg_exttable tuple has an odd encoding -- it is
+ * partially parsed and contains "string" values that aren't legal SQL.
+ * Each string value is delimited by apostrophes and is usually, but not
+ * always, a single character.	The fmtopts field is typically something
+ * like {delimiter '\x09' null '\N' escape '\'} or
+ * {delimiter ',' null '' escape '\' quote '''}.  Each backslash and
+ * apostrophe in a string must be escaped and each string must be
+ * prepended with an 'E' denoting an "escape syntax" string.
+ *
+ * Usage note: A field value containing an apostrophe followed by a space
+ * will throw this algorithm off -- it presumes no embedded spaces.
+ */
+static char* escape_fmtopts_string(const char *src)
+{
+	int			len = strlen(src);
+	int			i;
+	int			j;
+	char	   *result = palloc0(len * 2 + 1);
+	bool		inString = false;
+
+	for (i = 0, j = 0; i < len; i++)
+	{
+		switch (src[i])
+		{
+			case '\'':
+				if (inString)
+				{
+					/*
+					 * Escape apostrophes *within* the string. If the
+					 * apostrophe is at the end of the source string or is
+					 * followed by a space, it is presumed to be a closing
+					 * apostrophe and is not escaped.
+					 */
+					if ((i + 1) == len || src[i + 1] == ' ')
+						inString = false;
+					else
+						result[j++] = '\\';
+				}
+				else
+				{
+					result[j++] = 'E';
+					inString = true;
+				}
+				break;
+			case '\\':
+				result[j++] = '\\';
+				break;
+		}
+
+		result[j++] = src[i];
+	}
+
+	result[j] = '\0';
+	return result;
+}
+
+/*
+ * Tokenize a fmtopts string (for use with 'custom' formatters)
+ * i.e. convert it to: a = b, format.
+ * (e.g.:  formatter E'fixedwidth_in null E' ' preserve_blanks E'on')
+ */
+static char* custom_fmtopts_string(const char *src)
+{
+		int			len = src ? strlen(src) : 0;
+		char	   *result = palloc0(len * 2 + 1);
+		char	   *srcdup = src ? pstrdup(src) : NULL;
+		char	   *srcdup_start = srcdup;
+		char       *find_res = NULL;
+		int        last = 0;
+
+		if(!src || !srcdup || !result)
+			return NULL;
+
+		while (srcdup)
+		{
+			/* find first word (a) */
+			find_res = strchr(srcdup, ' ');
+			if (!find_res)
+				break;
+			strncat(result, srcdup, (find_res - srcdup));
+			/* skip space */
+			srcdup = find_res + 1;
+			/* remove E if E' */
+			if((strlen(srcdup) > 2) && (srcdup[0] == 'E') && (srcdup[1] == '\''))
+				srcdup++;
+			/* add " = " */
+			strncat(result, " = ", 3);
+			/* find second word (b) until second '
+			   find \' combinations and ignore them */
+			find_res = strchr(srcdup + 1, '\'');
+			while (find_res && (*(find_res - 1) == '\\') /* ignore \' */)
+			{
+				find_res = strchr(find_res + 1, '\'');
+			}
+			if (!find_res)
+				break;
+			strncat(result, srcdup, (find_res - srcdup + 1));
+			srcdup = find_res + 1;
+			/* skip space and add ',' */
+			if (srcdup && srcdup[0] == ' ')
+			{
+				srcdup++;
+				strncat(result, ",", 1);
+			}
+		}
+
+		/* fix string - remove trailing ',' or '=' */
+		last = strlen(result)-1;
+		if(result[last] == ',' || result[last] == '=')
+			result[last]='\0';
+
+		pfree(srcdup_start);
+		return result;
+}
+
+static void printExtTable(Oid relationOid, ExtTableEntry* extTable)
+{
+
+	if (extTable == NULL)
+		return;
+
+	elog(DEBUG2, "extTable params: oid: %d command: %s, encoding: %d, "
+			"format: %c (%s), error table oid: %d, format options: %s, "
+			"is web: %d, is writable: %d, locations size: %d, "
+			"reject limit: %d, reject limit type: %c",
+			relationOid,
+			extTable->command ? extTable->command : "NULL",
+			extTable->encoding,
+			extTable->fmtcode,
+			parseFormat(extTable->fmtcode),
+			extTable->fmterrtbl,
+			extTable->fmtopts,
+			extTable->isweb,
+			extTable->iswritable,
+			list_length(extTable->locations),
+			extTable->rejectlimit,
+			extTable->rejectlimittype == -1 ? 'n' : extTable->rejectlimittype);
+}
+
+/*
+ * This method returns an SQL command to create a PXF table
+ * which is a copy of a given PXF table relationOid, with the following changes:
+ * - PXF sample table name is pg_temp.pg_analyze_pxf_<relationOid>
+ * - LOCATION part is appended 2 attributes - pxf_sample_ratio, pxf_max_fragments.
+ * - in case of error table - SEGMENT REJECT LIMIT 25 PERCENT
+ *
+ * Input:
+ * 	relationOid 		- relation to be copied
+ * 	schemaName 			- schema name of original table
+ * 	tableName			- table name of original table
+ * 	sampleSchemaName	- schema name of new table
+ * 	pxfSampleTable		- table name or new table
+ * 	pxf_sample_ratio	- ratio of samplings to be done per fragment
+ * 	pxf_max_fragments	- max number of fragments to be sampled
+ * Output:
+ * 	SQL statement string for creating the new table
+ */
+static char* createPxfSampleStmt(Oid relationOid,
+		const char* schemaName, const char* tableName,
+		const char* sampleSchemaName, const char* pxfSampleTable,
+		float4 pxf_sample_ratio, int pxf_max_fragments)
+{
+	ExtTableEntry *extTable = GetExtTableEntry(relationOid);
+	StringInfoData str;
+	initStringInfo(&str);
+	char* location = NULL;
+	char* tmpstring = NULL;
+	char* escapedfmt = NULL;
+	char* tabfmt = NULL;
+	char* customfmt = NULL;
+
+	printExtTable(relationOid, extTable);
+
+	location = escape_unprintables(((Value*)list_nth(extTable->locations, 0))->val.str /*pxfLocation*/);
+
+	appendStringInfo(&str, "CREATE EXTERNAL TABLE %s.%s (LIKE %s.%s) "
+			"LOCATION(E'%s&STATS-SAMPLE-RATIO=%.4f&STATS-MAX-FRAGMENTS=%d') ",
+			quote_identifier(sampleSchemaName),
+			quote_identifier(pxfSampleTable),
+			quote_identifier(schemaName),
+			quote_identifier(tableName),
+			location,
+			pxf_sample_ratio,
+			pxf_max_fragments);
+
+	pfree(location);
+
+	/* add FORMAT clause */
+	escapedfmt = escape_fmtopts_string((const char *) extTable->fmtopts);
+	tmpstring = escape_unprintables((const char *) escapedfmt);
+	pfree(escapedfmt);
+	escapedfmt = NULL;
+
+	switch (extTable->fmtcode)
+	{
+	case 't':
+		tabfmt = "text";
+		break;
+	case 'b':
+		/*
+		 * b denotes that a custom format is used.
+		 * the fmtopts string should be formatted as:
+		 * a1 = 'val1',...,an = 'valn'
+		 *
+		 */
+		tabfmt = "custom";
+		customfmt = custom_fmtopts_string(tmpstring);
+		break;
+	default:
+		tabfmt = "csv";
+	}
+	appendStringInfo(&str, "FORMAT '%s' (%s) ",
+			tabfmt,
+			customfmt ? customfmt : tmpstring);
+	pfree(tmpstring);
+	tmpstring = NULL;
+	if (customfmt)
+	{
+		pfree(customfmt);
+		customfmt = NULL;
+	}
+	/* add ENCODING clause */
+	appendStringInfo(&str, "ENCODING '%s' ", pg_encoding_to_char(extTable->encoding));
+
+	/* add error control clause */
+	if (extTable->rejectlimit != -1)
+	{
+		appendStringInfo(&str, "%s", "SEGMENT REJECT LIMIT 25 PERCENT ");
+	}
+
+	elog(DEBUG2, "createPxfSampleStmt SQL statement: %s", str.data);
+
+	return str.data;
+}
+
+/*
+ * Returns the number of tuples in the first fragment of given
+ * PXF table.
+ * This is done by creating a copy of the PXF table, with additional parameters
+ * limiting the query to the first fragment only (pxf_max_fragments = 1, pxf_sample_ratio = 1.0),
+ * and running a COUNT query on it.
+ * The tuple count result is returned.
+ *
+ * Input:
+ * 	relationOid 	- relation to be sampled
+ */
+static float4 getPxfFragmentTupleCount(Oid relationOid)
+{
+	const char *schemaName = get_namespace_name(get_rel_namespace(relationOid)); /* must be pfreed */
+	const char *tableName = get_rel_name(relationOid); /* must be pfreed */
+	char	*sampleSchemaName = pstrdup("pg_temp");
+	char	*pxfEstimateTable = temporarySampleTableName(relationOid, "pg_analyze_pxf_est"); /* must be pfreed */
+	Oid			pxfEstimateTableOid = InvalidOid;
+	RangeVar 	*rangeVar = NULL;
+	float4	ntuples = -1.0;
+
+	/* build copy of original pxf table */
+	buildPxfTableCopy(relationOid,
+					  1.0, /* get all tuples */
+					  1, /* query only first fragment */
+					  schemaName, tableName,
+					  sampleSchemaName, pxfEstimateTable);
+
+	rangeVar = makeRangeVar(NULL /*catalogname*/, sampleSchemaName, pxfEstimateTable, -1);
+	pxfEstimateTableOid = RangeVarGetRelid(rangeVar, true /* failOK */, false /*allowHcatalog*/);
+
+	if (pxfEstimateTableOid == InvalidOid)
+	{
+		elog(ERROR, "Unable to create a copy of PXF table %s.%s", schemaName, tableName);
+	}
+
+	/* run count query */
+	ntuples = countFirstFragmentTuples(sampleSchemaName, pxfEstimateTable);
+
+	Assert(pxfEstimateTable != InvalidOid);
+
+	elog(DEBUG2, "ANALYZE dropping PXF estimate table %s.%s (%d)",
+		 sampleSchemaName, pxfEstimateTable, pxfEstimateTableOid);
+	dropSampleTable(pxfEstimateTableOid, true);
+
+	pfree((void *) rangeVar);
+	pfree((void *) pxfEstimateTable);
+	pfree((void *) tableName);
+	pfree((void *) schemaName);
+	pfree((void *) sampleSchemaName);
+
+	return ntuples;
+}
+
+static float4 countFirstFragmentTuples(const char* schemaName,
+									   const char* tableName)
+{
+	float ntuples = -1.0;
+	StringInfoData str;
+
+	initStringInfo(&str);
+	appendStringInfo(&str, "select count(*)::float4 from %s.%s",
+			quote_identifier(schemaName),
+			quote_identifier(tableName));
+
+	/* in case of PXF error, analyze on this table will be reverted */
+	spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+						   spiCallback_getSingleResultRowColumnAsFloat4, &ntuples);
+
+	pfree(str.data);
+
+	elog(DEBUG3, "count() of first pxf fragment gives %f values.", ntuples);
+
+	return ntuples;
+}
+
+/* --------------------------------
+ *		getFragmentStats  -
+ *
+ *		Fetch number of fragments, size of first fragment and total size of datasource,
+ *		for an external table which is PXF
+ * --------------------------------
+ */
+static void getFragmentStats(Relation rel, StringInfo location,
+							 float4 *numfrags, float4 *firstfragsize,
+							 float4 *totalsize)
+{
+
+	PxfFragmentStatsElem *elem = NULL;
+	elem = get_pxf_fragments_statistics(location->data, rel);
+
+	/*
+	 * if get_pxf_fragments_statistics returned NULL - probably a communication error, we
+	 * error out.
+	 */
+	if (!elem)
+	{
+		elog(ERROR, "No statistics were returned for relation %s", RelationGetRelationName(rel));
+	}
+
+	*numfrags = elem->numFrags;
+	*firstfragsize = elem->firstFragSize;
+	*totalsize = elem->totalSize;
+	pfree(elem);
+
+	elog(DEBUG2, "ANALYZE estimate for PXF table %s: fragments %f, first frag size %f, "
+			"total size %f [max int %d]",
+			RelationGetRelationName(rel), *numfrags, *firstfragsize, *totalsize, INT_MAX);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/pxfmasterapi.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfmasterapi.c b/src/backend/access/external/pxfmasterapi.c
index ba505b3..b1a6240 100644
--- a/src/backend/access/external/pxfmasterapi.c
+++ b/src/backend/access/external/pxfmasterapi.c
@@ -30,11 +30,11 @@
 #include "catalog/hcatalog/externalmd.h"
 
 static List* parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf);
-static PxfStatsElem* parse_get_stats_response(StringInfo rest_buf);
+static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf);
+static float4 normalize_size(long size, char* unit);
 static List* parse_get_fragments_response(List* fragments, StringInfo rest_buf);
 static void ha_failover(GPHDUri *hadoop_uri, ClientContext *client_context, char* rest_msg);
-static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *
-);
+static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg);
 static char* concat(char *body, char *tail);
 
 /*
@@ -72,14 +72,14 @@ parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf)
  * Wrap the REST call with a retry for the HA HDFS scenario
  */
 static void
-rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *restMsg)
+rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg)
 {
 	Assert(hadoop_uri->host != NULL && hadoop_uri->port != NULL);
 
 	/* construct the request */
 	PG_TRY();
 	{
-		call_rest(hadoop_uri, client_context, restMsg);
+		call_rest(hadoop_uri, client_context, rest_msg);
 	}
 	PG_CATCH();
 	{
@@ -93,7 +93,7 @@ rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *restMsg)
 			if (!elog_dismiss(DEBUG5))
 				PG_RE_THROW(); /* hope to never get here! */
 
-			ha_failover(hadoop_uri, client_context, restMsg);
+			ha_failover(hadoop_uri, client_context, rest_msg);
 		}
 		else /*This is not HA - so let's re-throw */
 			PG_RE_THROW();
@@ -140,77 +140,83 @@ void free_datanode_rest_server(PxfServer* srv)
 }
 
 /*
- * Fetch the statistics from the PXF service
+ * Fetch fragment statistics from the PXF service
  */
-PxfStatsElem *get_data_statistics(GPHDUri* hadoop_uri,
-										 ClientContext *client_context,
-										 StringInfo err_msg)
+PxfFragmentStatsElem *get_fragments_statistics(GPHDUri* hadoop_uri,
+											   ClientContext *client_context)
 {
-	char *restMsg = concat("http://%s:%s/%s/%s/Analyzer/getEstimatedStats?path=", hadoop_uri->data);
+	char *restMsg = concat("http://%s:%s/%s/%s/Fragmenter/getFragmentsStats?path=", hadoop_uri->data);
 
 	/* send the request. The response will exist in rest_buf.data */
-	PG_TRY();
-	{
-		rest_request(hadoop_uri, client_context, restMsg);
-	}
-	PG_CATCH();
-	{
-		/*
-		 * communication problems with PXF service
-		 * Statistics for a table can be done as part of an ANALYZE procedure on many tables,
-		 * and we don't want to stop because of a communication error. So we catch the exception,
-		 * append its error to err_msg, and return a NULL,
-		 * which will force the the analyze code to use former calculated values or defaults.
-		 */
-		if (err_msg)
-		{
-			char* message = elog_message();
-			if (message)
-				appendStringInfo(err_msg, "%s", message);
-			else
-				appendStringInfo(err_msg, "Unknown error");
-		}
-
-		/* release error state */
-		if (!elog_dismiss(DEBUG5))
-			PG_RE_THROW(); /* hope to never get here! */
-
-		return NULL;
-	}
-	PG_END_TRY();
+	rest_request(hadoop_uri, client_context, restMsg);
 
-	/* parse the JSON response and form a fragments list to return */
-	return parse_get_stats_response(&(client_context->the_rest_buf));
+	/* parse the JSON response and form a statistics struct to return */
+	return parse_get_frag_stats_response(&(client_context->the_rest_buf));
 }
 
 /*
- * Parse the json response from the PXF Fragmenter.getSize
+ * Parse the json response from the PXF Fragmenter.getFragmentsStats
  */
-static PxfStatsElem *parse_get_stats_response(StringInfo rest_buf)
+static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf)
 {
-	PxfStatsElem* statsElem = (PxfStatsElem*)palloc0(sizeof(PxfStatsElem));
+	PxfFragmentStatsElem* statsElem = (PxfFragmentStatsElem*)palloc0(sizeof(PxfFragmentStatsElem));
 	struct json_object	*whole	= json_tokener_parse(rest_buf->data);
 	if ((whole == NULL) || is_error(whole))
 	{
 		elog(ERROR, "Failed to parse statistics data from PXF");
 	}
-	struct json_object	*head	= json_object_object_get(whole, "PXFDataSourceStats");
-
-	/* 0. block size */
-	struct json_object *js_block_size = json_object_object_get(head, "blockSize");
-	statsElem->blockSize = json_object_get_int(js_block_size);
-
-	/* 1. number of blocks */
-	struct json_object *js_num_blocks = json_object_object_get(head, "numberOfBlocks");
-	statsElem->numBlocks = json_object_get_int(js_num_blocks);
-
-	/* 2. number of tuples */
-	struct json_object *js_num_tuples = json_object_object_get(head, "numberOfTuples");
-	statsElem->numTuples = json_object_get_int(js_num_tuples);
+	struct json_object	*head	= json_object_object_get(whole, "PXFFragmentsStats");
+
+	/* 0. number of fragments */
+	struct json_object *js_num_fragments = json_object_object_get(head, "fragmentsNumber");
+	statsElem->numFrags = json_object_get_int(js_num_fragments);
+
+	/* 1. first fragment size */
+	struct json_object *js_first_frag_size = json_object_object_get(head, "firstFragmentSize");
+	struct json_object *js_size = json_object_object_get(js_first_frag_size, "size");
+	long size = json_object_get_int(js_size);
+	struct json_object *js_unit = json_object_object_get(js_first_frag_size, "unit");
+	char* unit = pstrdup(json_object_get_string(js_unit));
+	statsElem->firstFragSize = normalize_size(size, unit);
+	pfree(unit);
+
+	/* 2. total size */
+	struct json_object *js_total_size = json_object_object_get(head, "totalSize");
+	js_size = json_object_object_get(js_total_size, "size");
+	size = json_object_get_int(js_size);
+	js_unit = json_object_object_get(js_total_size, "unit");
+	unit = pstrdup(json_object_get_string(js_unit));
+	statsElem->totalSize = normalize_size(size, unit);
+	pfree(unit);
 
 	return statsElem;
 }
 
+static float4 normalize_size(long size, char* unit) {
+	const float4 multiplier = 1024.0;
+	if (strcmp(unit,"B") == 0)
+	{
+		return size;
+	}
+	if (strcmp(unit,"KB") == 0)
+	{
+		return size * multiplier;
+	}
+	if (strcmp(unit,"MB") == 0)
+	{
+		return size * multiplier * multiplier;
+	}
+	if (strcmp(unit,"GB") == 0)
+	{
+		return size * multiplier * multiplier * multiplier;
+	}
+	if (strcmp(unit,"TB") == 0)
+	{
+		return size * multiplier * multiplier * multiplier * multiplier;
+	}
+	return -1;
+}
+
 /*
  * ha_failover
  *

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/Makefile b/src/backend/access/external/test/Makefile
index e13d107..5778958 100644
--- a/src/backend/access/external/test/Makefile
+++ b/src/backend/access/external/test/Makefile
@@ -1,7 +1,7 @@
 subdir=src/backend/access/external
 top_builddir=../../../../..
 
-TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi
+TARGETS=pxfuriparser hd_work_mgr pxfheaders ha_config pxffilters pxfmasterapi pxfanalyze
 
 # Objects from backend, which don't need to be mocked but need to be linked.
 COMMON_REAL_OBJS=\
@@ -59,7 +59,11 @@ pxfmasterapi_REAL_OBJS=$(COMMON_REAL_OBJS) \
 	$(top_srcdir)/src/backend/utils/fmgrtab.o	
 pxffilters_REAL_OBJS=$(COMMON_REAL_OBJS) \
 	$(top_srcdir)/src/backend/optimizer/util/clauses.o \
-	$(top_srcdir)/src/backend/parser/parse_expr.o    
+	$(top_srcdir)/src/backend/parser/parse_expr.o
+pxfanalyze_REAL_OBJS=$(COMMON_REAL_OBJS) \
+	$(top_srcdir)/src/backend/utils/adt/ruleutils.o \
+	$(top_srcdir)/src/backend/parser/kwlookup.o \
+	$(top_srcdir)/src/backend/utils/mb/encnames.o    
 
 include ../../../../Makefile.mock
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/README.txt
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/README.txt b/src/backend/access/external/test/README.txt
index 75a03f0..dfe73b8 100644
--- a/src/backend/access/external/test/README.txt
+++ b/src/backend/access/external/test/README.txt
@@ -1,6 +1,9 @@
 Directory with the following System Under Test (SUT):
- - pxfuriparser.c
- - hd_work_mgr.c
- - pxfheaders.c
  - ha_config.c
+ - hd_work_mgr.c
+ - pxfanalyze.c
  - pxffilters.c
+ - pxfheaders.c
+ - pxfmasterapi.c
+ - pxfuriparser.c
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
index ce8e61d..91bfa43 100644
--- a/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
+++ b/src/backend/access/external/test/hd_work_mgr_do_segment_clustering_by_host_test.c
@@ -28,15 +28,17 @@
 
 /*
  * check element list_index in segmenet_list
- * has the expected hostip.
+ * has the expected hostip and segindex.
  */
 void check_segment_info(List* segment_list, int list_index,
-						const char* expected_hostip)
+						const char* expected_hostip,
+						int expected_segindex)
 {
 
 	Segment* seg_info =
 			(Segment*)(list_nth(segment_list, list_index));
 	assert_string_equal(seg_info->hostip, expected_hostip);
+	assert_int_equal(seg_info->segindex, expected_segindex);
 }
 
 /*
@@ -76,9 +78,9 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
 	gphost = (GpHost*)lfirst(cell);
 	assert_string_equal(gphost->ip, array_of_segs[0]);
 	assert_int_equal(list_length(gphost->segs), 4);
-    for (int i = 0; i < 4; ++i)
+	for (int i = 0; i < 4; ++i)
 	{
-		check_segment_info(gphost->segs, i, "1.2.3.1");
+		check_segment_info(gphost->segs, i, "1.2.3.1", i);
 	}
 
 	cell = list_nth_cell(groups, 1);
@@ -87,7 +89,7 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
 	assert_int_equal(list_length(gphost->segs), 3);
 	for (int i = 0; i < 3; ++i)
 	{
-		check_segment_info(gphost->segs, i, "1.2.3.2");
+		check_segment_info(gphost->segs, i, "1.2.3.2", i+4);
 	}
 
 	cell = list_nth_cell(groups, 2);
@@ -96,9 +98,8 @@ test__do_segment_clustering_by_host__10SegmentsOn3Hosts(void **state)
 	assert_int_equal(list_length(gphost->segs), 3);
 	for (int i = 0; i < 3; ++i)
 	{
-		check_segment_info(gphost->segs, i, "1.2.3.3");
+		check_segment_info(gphost->segs, i, "1.2.3.3", i+7);
 	}
 
 	freeQueryResource();
 }
-


[3/3] incubator-hawq git commit: HAWQ-44. Advanced statistics for PXF tables.

Posted by nh...@apache.org.
HAWQ-44. Advanced statistics for PXF tables.

PXF sample rows are collected into a temporary table, where statistics are derived of them in the same way ANALYZE works for hawq tables.
Statistics are gathered at 3 stages:
1. Getting general statistics - number of fragments, size of data source, size of first fragment
2. Count of first fragment tuples
HAWQ uses these numbers to determine how many tuples are needed, and these parameters are translated to sampling ratio and number of sampled fragments.
3. Sampling the PXF table based on the sampling ratio and number of fragments to be sampled. The returned tuples are saved in a temporary table.

On the PXF side, a function has been added to the Fragmenter API, to allow gathering the stats of the first stage. In addition, a mechanism to sample rows on the fly was added to the Bridge.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/81385f09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/81385f09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/81385f09

Branch: refs/heads/master
Commit: 81385f09fbbc59b2912b2f8ff9a3edbee6427e9b
Parents: 4dbb347
Author: Noa Horn <nh...@pivotal.io>
Authored: Fri Nov 20 13:28:21 2015 -0800
Committer: Noa Horn <nh...@pivotal.io>
Committed: Fri Nov 20 13:28:21 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hawq/pxf/api/AnalyzerStats.java  |  40 +-
 .../org/apache/hawq/pxf/api/Fragmenter.java     |  29 +-
 .../org/apache/hawq/pxf/api/FragmentsStats.java | 233 ++++++
 .../apache/hawq/pxf/api/FragmentsStatsTest.java |  90 +++
 .../pxf/plugins/hbase/HBaseDataFragmenter.java  |  15 +-
 .../hawq/pxf/plugins/hdfs/HdfsAnalyzer.java     |  73 +-
 .../pxf/plugins/hdfs/HdfsDataFragmenter.java    |  52 +-
 .../pxf/plugins/hdfs/StringPassResolver.java    |   2 +-
 .../pxf/plugins/hive/HiveDataFragmenter.java    |  10 +-
 .../plugins/hive/HiveInputFormatFragmenter.java |   9 +
 .../hawq/pxf/service/AnalyzerFactory.java       |   4 +-
 .../org/apache/hawq/pxf/service/Bridge.java     |   9 +-
 .../hawq/pxf/service/BridgeOutputBuilder.java   | 117 ++-
 .../hawq/pxf/service/FragmentsResponse.java     |   2 +-
 .../pxf/service/FragmentsResponseFormatter.java |   1 -
 .../org/apache/hawq/pxf/service/ReadBridge.java | 112 ++-
 .../hawq/pxf/service/ReadSamplingBridge.java    | 112 +++
 .../apache/hawq/pxf/service/WriteBridge.java    |   9 +-
 .../hawq/pxf/service/io/BufferWritable.java     |  21 +
 .../hawq/pxf/service/rest/BridgeResource.java   |  65 +-
 .../pxf/service/rest/FragmenterResource.java    | 105 ++-
 .../pxf/service/rest/InvalidPathResource.java   |   6 +-
 .../pxf/service/utilities/AnalyzeUtils.java     | 128 ++++
 .../pxf/service/utilities/ProtocolData.java     |  75 +-
 .../pxf/service/BridgeOutputBuilderTest.java    | 357 +++++++--
 .../pxf/service/ReadSamplingBridgeTest.java     | 225 ++++++
 .../hawq/pxf/service/io/BufferWritableTest.java |  22 +
 .../pxf/service/utilities/AnalyzeUtilsTest.java | 117 +++
 .../pxf/service/utilities/ProtocolDataTest.java | 129 +++-
 src/backend/access/external/Makefile            |   2 +-
 src/backend/access/external/hd_work_mgr.c       |  32 +-
 src/backend/access/external/pxfanalyze.c        | 740 +++++++++++++++++++
 src/backend/access/external/pxfmasterapi.c      | 122 +--
 src/backend/access/external/test/Makefile       |   8 +-
 src/backend/access/external/test/README.txt     |   9 +-
 ...ork_mgr_do_segment_clustering_by_host_test.c |  15 +-
 .../access/external/test/pxfanalyze_test.c      | 171 +++++
 .../access/external/test/pxfmasterapi_test.c    |  21 +-
 src/backend/commands/analyze.c                  | 233 +++---
 src/backend/commands/tablecmds.c                |   4 +-
 src/backend/optimizer/util/plancat.c            |  39 +-
 src/backend/utils/misc/guc.c                    |  11 +
 src/include/access/hd_work_mgr.h                |  14 +-
 src/include/access/pxfanalyze.h                 |  37 +
 src/include/access/pxfmasterapi.h               |   2 +-
 src/include/access/pxfuriparser.h               |   2 +-
 src/include/cdb/cdbanalyze.h                    |   6 -
 src/include/commands/analyzeutils.h             |  35 +
 src/include/utils/guc.h                         |   1 +
 49 files changed, 3126 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
index a06b4f1..3d2c665 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
@@ -13,8 +13,8 @@ public class AnalyzerStats {
     private static final long DEFAULT_NUMBER_OF_BLOCKS = 1L;
     private static final long DEFAULT_NUMBER_OF_TUPLES = 1000000L;
 
-    private long blockSize;        // block size (in bytes)
-    private long numberOfBlocks;    // number of blocks
+    private long blockSize; // block size (in bytes)
+    private long numberOfBlocks; // number of blocks
     private long numberOfTuples; // number of tuples
 
     /**
@@ -24,8 +24,7 @@ public class AnalyzerStats {
      * @param numberOfBlocks number of blocks
      * @param numberOfTuples number of tuples
      */
-    public AnalyzerStats(long blockSize,
-                         long numberOfBlocks,
+    public AnalyzerStats(long blockSize, long numberOfBlocks,
                          long numberOfTuples) {
         this.setBlockSize(blockSize);
         this.setNumberOfBlocks(numberOfBlocks);
@@ -34,38 +33,41 @@ public class AnalyzerStats {
 
     /** Constructs an AnalyzerStats with the default values */
     public AnalyzerStats() {
-        this(DEFAULT_BLOCK_SIZE, DEFAULT_NUMBER_OF_BLOCKS, DEFAULT_NUMBER_OF_TUPLES);
+        this(DEFAULT_BLOCK_SIZE, DEFAULT_NUMBER_OF_BLOCKS,
+                DEFAULT_NUMBER_OF_TUPLES);
     }
 
     /**
-     * Given an AnalyzerStats, serialize it in JSON to be used as
-     * the result string for HAWQ. An example result is as follows:
-     * {"PXFDataSourceStats":{"blockSize":67108864,"numberOfBlocks":1,"numberOfTuples":5}}
+     * Given an AnalyzerStats, serialize it in JSON to be used as the result
+     * string for HAWQ. An example result is as follows:
+     * {"PXFDataSourceStats":{"blockSize"
+     * :67108864,"numberOfBlocks":1,"numberOfTuples":5}}
      *
      * @param stats the data to be serialized
      * @return the result in json format
      * @throws IOException if converting to JSON format failed
      */
-    public static String dataToJSON(AnalyzerStats stats) throws IOException  {
+    public static String dataToJSON(AnalyzerStats stats) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
         // mapper serializes all members of the class by default
-        return "{\"PXFDataSourceStats\":" + mapper.writeValueAsString(stats) + "}";
+        return "{\"PXFDataSourceStats\":" + mapper.writeValueAsString(stats)
+                + "}";
     }
 
     /**
-     * Given a stats structure, convert it to be readable. Intended
-     * for debugging purposes only.
+     * Given a stats structure, convert it to be readable. Intended for
+     * debugging purposes only.
      *
      * @param stats the data to be stringify
-     * @param datapath the data path part of the original URI (e.g., table name, *.csv, etc.)
-     * @return the stringify data
+     * @param datapath the data path part of the original URI (e.g., table name,
+     *            *.csv, etc.)
+     * @return the stringified data
      */
     public static String dataToString(AnalyzerStats stats, String datapath) {
-        return "Statistics information for \"" + datapath + "\" " +
-                " Block Size: " + stats.blockSize +
-                ", Number of blocks: " + stats.numberOfBlocks +
-                ", Number of tuples: " + stats.numberOfTuples;
-
+        return "Statistics information for \"" + datapath + "\" "
+                + " Block Size: " + stats.blockSize + ", Number of blocks: "
+                + stats.numberOfBlocks + ", Number of tuples: "
+                + stats.numberOfTuples;
     }
 
     public long getBlockSize() {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
index 4ae057f..cb9cda8 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
@@ -7,7 +7,8 @@ import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Abstract class that defines the splitting of a data resource into fragments that can be processed in parallel.
+ * Abstract class that defines the splitting of a data resource into fragments
+ * that can be processed in parallel.
  */
 public abstract class Fragmenter extends Plugin {
     protected List<Fragment> fragments;
@@ -23,11 +24,33 @@ public abstract class Fragmenter extends Plugin {
     }
 
     /**
-     * Gets the fragments of a given path (source name and location of each fragment).
-     * Used to get fragments of data that could be read in parallel from the different segments.
+     * Gets the fragments of a given path (source name and location of each
+     * fragment). Used to get fragments of data that could be read in parallel
+     * from the different segments.
      *
      * @return list of data fragments
      * @throws Exception if fragment list could not be retrieved
      */
     public abstract List<Fragment> getFragments() throws Exception;
+
+    /**
+     * Default implementation of statistics for fragments. The default is:
+     * <ul>
+     * <li>number of fragments - as gathered by {@link #getFragments()}</li>
+     * <li>first fragment size - 64MB</li>
+     * <li>total size - number of fragments times first fragment size</li>
+     * </ul>
+     * Each fragmenter implementation can override this method to better match
+     * its fragments stats.
+     *
+     * @return default statistics
+     * @throws Exception if statistics cannot be gathered
+     */
+    public FragmentsStats getFragmentsStats() throws Exception {
+        List<Fragment> fragments = getFragments();
+        long fragmentsNumber = fragments.size();
+        return new FragmentsStats(fragmentsNumber,
+                FragmentsStats.DEFAULT_FRAGMENT_SIZE, fragmentsNumber
+                        * FragmentsStats.DEFAULT_FRAGMENT_SIZE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java
new file mode 100644
index 0000000..004a11c
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java
@@ -0,0 +1,233 @@
+package org.apache.hawq.pxf.api;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * FragmentsStats holds statistics for a given path.
+ */
+public class FragmentsStats {
+
+    /**
+     * Default fragment size. Assuming a fragment is equivalent to a block in
+     * HDFS, we guess a full fragment size is 64MB.
+     */
+    public static final long DEFAULT_FRAGMENT_SIZE = 67108864L;
+
+    private static Log Log = LogFactory.getLog(FragmentsStats.class);
+
+    // number of fragments
+    private long fragmentsNumber;
+    // first fragment size
+    private SizeAndUnit firstFragmentSize;
+    // total fragments size
+    private SizeAndUnit totalSize;
+
+    /**
+     * Enum to represent unit (Bytes/KB/MB/GB/TB)
+     */
+    public enum SizeUnit {
+        /**
+         * Byte
+         */
+        B,
+        /**
+         * KB
+         */
+        KB,
+        /**
+         * MB
+         */
+        MB,
+        /**
+         * GB
+         */
+        GB,
+        /**
+         * TB
+         */
+        TB;
+    };
+
+    /**
+     * Container for size and unit
+     */
+    public class SizeAndUnit {
+        long size;
+        SizeUnit unit;
+
+        /**
+         * Default constructor.
+         */
+        public SizeAndUnit() {
+            this.size = 0;
+            this.unit = SizeUnit.B;
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param size size
+         * @param unit unit
+         */
+        public SizeAndUnit(long size, SizeUnit unit) {
+            this.size = size;
+            this.unit = unit;
+        }
+
+        /**
+         * Returns size.
+         *
+         * @return size
+         */
+        public long getSize() {
+            return this.size;
+        }
+
+        /**
+         * Returns unit (Byte/KB/MB/etc.).
+         *
+         * @return unit
+         */
+        public SizeUnit getUnit() {
+            return this.unit;
+        }
+
+        @Override
+        public String toString() {
+            return size + "" + unit;
+        }
+    }
+
+    /**
+     * Constructs an FragmentsStats.
+     *
+     * @param fragmentsNumber number of fragments
+     * @param firstFragmentSize first fragment size (in bytes)
+     * @param totalSize total size (in bytes)
+     */
+    public FragmentsStats(long fragmentsNumber, long firstFragmentSize,
+                          long totalSize) {
+        this.setFragmentsNumber(fragmentsNumber);
+        this.setFirstFragmentSize(firstFragmentSize);
+        this.setTotalSize(totalSize);
+    }
+
+    /**
+     * Given a {@link FragmentsStats}, serialize it in JSON to be used as the
+     * result string for HAWQ. An example result is as follows:
+     * <code>{"PXFFragmentsStats":{"fragmentsNumber"
+     * :3,"firstFragmentSize":67108864,"totalSize":200000000}}</code>
+     *
+     * @param stats the data to be serialized
+     * @return the result in json format
+     * @throws IOException if converting to JSON format failed
+     */
+    public static String dataToJSON(FragmentsStats stats) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        // mapper serializes all members of the class by default
+        return "{\"PXFFragmentsStats\":" + mapper.writeValueAsString(stats)
+                + "}";
+    }
+
+    /**
+     * Given a stats structure, convert it to be readable. Intended for
+     * debugging purposes only.
+     *
+     * @param stats the data to be stringify
+     * @param datapath the data path part of the original URI (e.g., table name,
+     *            *.csv, etc.)
+     * @return the stringified data
+     */
+    public static String dataToString(FragmentsStats stats, String datapath) {
+        return "Statistics information for \"" + datapath + "\" "
+                + " Number of Fragments: " + stats.fragmentsNumber
+                + ", first Fragment size: " + stats.firstFragmentSize
+                + ", total size: " + stats.totalSize;
+    }
+
+    /**
+     * Returns number of fragments for a given data source.
+     *
+     * @return number of fragments
+     */
+    public long getFragmentsNumber() {
+        return fragmentsNumber;
+    }
+
+    private void setFragmentsNumber(long fragmentsNumber) {
+        this.fragmentsNumber = fragmentsNumber;
+    }
+
+    /**
+     * Returns the size in bytes of the first fragment.
+     *
+     * @return first fragment size (in byte)
+     */
+    public SizeAndUnit getFirstFragmentSize() {
+        return firstFragmentSize;
+    }
+
+    private void setFirstFragmentSize(long firstFragmentSize) {
+        this.firstFragmentSize = setSizeAndUnit(firstFragmentSize);
+    }
+
+    /**
+     * Returns the total size of a given source. Usually it means the
+     * aggregation of all its fragments size.
+     *
+     * @return total size
+     */
+    public SizeAndUnit getTotalSize() {
+        return totalSize;
+    }
+
+    private void setTotalSize(long totalSize) {
+        this.totalSize = setSizeAndUnit(totalSize);
+    }
+
+    private SizeAndUnit setSizeAndUnit(long originalSize) {
+        final long THRESHOLD = Integer.MAX_VALUE / 2;
+        int orderOfMagnitude = 0;
+        SizeAndUnit sizeAndUnit = new SizeAndUnit();
+        sizeAndUnit.size = originalSize;
+
+        while (sizeAndUnit.size > THRESHOLD) {
+            sizeAndUnit.size /= 1024;
+            orderOfMagnitude++;
+        }
+
+        sizeAndUnit.unit = getSizeUnit(orderOfMagnitude);
+        return sizeAndUnit;
+    }
+
+    private SizeUnit getSizeUnit(int orderOfMagnitude) {
+        SizeUnit unit;
+        switch (orderOfMagnitude) {
+            case 0:
+                unit = SizeUnit.B;
+                break;
+            case 1:
+                unit = SizeUnit.KB;
+                break;
+            case 2:
+                unit = SizeUnit.MB;
+                break;
+            case 3:
+                unit = SizeUnit.GB;
+                break;
+            case 4:
+                unit = SizeUnit.TB;
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported order of magnitude "
+                                + orderOfMagnitude
+                                + ". Size's order of magnitue can be a value between 0(Bytes) and 4(TB)");
+        }
+        return unit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java
new file mode 100644
index 0000000..4e7eca0
--- /dev/null
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java
@@ -0,0 +1,90 @@
+package org.apache.hawq.pxf.api;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hawq.pxf.api.FragmentsStats.SizeUnit;
+import org.junit.Test;
+
+public class FragmentsStatsTest {
+
+    @Test
+    public void ctorSizeByte() {
+        ctorSizeTest(10, 100, 100, SizeUnit.B, 1000000, 1000000, SizeUnit.B);
+    }
+
+    @Test
+    public void ctorSizeKB() {
+        ctorSizeTest(40, 50, 50, SizeUnit.B, (long) Math.pow(2, 32), (long) Math.pow(2, 22),
+                SizeUnit.KB);
+    }
+
+    @Test
+    public void ctorSizeMB() {
+        ctorSizeTest(20, 50, 50, SizeUnit.B, (long) Math.pow(2, 40), (long) Math.pow(2, 20),
+                SizeUnit.MB);
+    }
+
+    @Test
+    public void ctorSizeGB() {
+        ctorSizeTest(25, 1000000, 1000000, SizeUnit.B, (long) Math.pow(6, 20),
+                (long) Math.pow(6, 20) / (long) Math.pow(2, 30), SizeUnit.GB);
+    }
+
+    @Test
+    public void ctorSizeTB() {
+        ctorSizeTest(25, 20000000, 20000000, SizeUnit.B, (long) Math.pow(5, 30),
+                (long) Math.pow(5, 30) / (long) Math.pow(2, 40), SizeUnit.TB);
+    }
+
+    @Test
+    public void ctorSize0() {
+        ctorSizeTest(0, 0, 0, SizeUnit.B, 0, 0, SizeUnit.B);
+    }
+
+    @Test
+    public void dataToJSON() throws IOException {
+        FragmentsStats fragmentsStats = new FragmentsStats(25, 20000000, (long) Math.pow(5, 30));
+        String json = FragmentsStats.dataToJSON(fragmentsStats);
+        String expectedJson = "{\"PXFFragmentsStats\":" +
+                "{\"fragmentsNumber\":" + fragmentsStats.getFragmentsNumber() +
+                ",\"firstFragmentSize\":" +
+                "{\"size\":" + fragmentsStats.getFirstFragmentSize().getSize() +
+                ",\"unit\":\"" + fragmentsStats.getFirstFragmentSize().getUnit() + "\"}" +
+                ",\"totalSize\":" +
+                "{\"size\":" + fragmentsStats.getTotalSize().getSize() +
+                ",\"unit\":\"" + fragmentsStats.getTotalSize().getUnit() + "\"}" +
+                "}}";
+        assertEquals(expectedJson, json);
+    }
+
+    @Test
+    public void dataToString() {
+        FragmentsStats fragmentsStats = new FragmentsStats(25, 2000000000, (long) Math.pow(5, 30));
+        String path = "la la la";
+        String str = FragmentsStats.dataToString(fragmentsStats, path);
+        String expected =  "Statistics information for \"" + path + "\" "
+                + " Number of Fragments: " + 25
+                + ", first Fragment size: " + 1953125 + "KB"
+                + ", total size: " + 8388607 + "TB";
+        assertEquals(expected, str);
+    }
+
+    private void ctorSizeTest(long fragsNum, long firstFragSize,
+                              long expectedFirstFragSize,
+                              SizeUnit expectedFirstFragSizeUnit, long totalSize,
+                              long expectedTotalSize,
+                              SizeUnit expectedTotalSizeUnit) {
+        FragmentsStats fragmentsStats = new FragmentsStats(fragsNum,
+                firstFragSize, totalSize);
+        assertEquals(fragsNum, fragmentsStats.getFragmentsNumber());
+        assertEquals(expectedFirstFragSize,
+                fragmentsStats.getFirstFragmentSize().size);
+        assertEquals(expectedFirstFragSizeUnit,
+                fragmentsStats.getFirstFragmentSize().unit);
+        assertEquals(expectedTotalSize, fragmentsStats.getTotalSize().size);
+        assertEquals(expectedTotalSizeUnit,
+                fragmentsStats.getTotalSize().unit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
index 36ae60c..1f1afae 100644
--- a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
@@ -2,10 +2,10 @@ package org.apache.hawq.pxf.plugins.hbase;
 
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseLookupTable;
 import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseUtilities;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.*;
@@ -32,11 +32,24 @@ public class HBaseDataFragmenter extends Fragmenter {
     private Admin hbaseAdmin;
     private Connection connection;
 
+    /**
+     * Constructor for HBaseDataFragmenter.
+     *
+     * @param inConf input data such as which HBase table to scan
+     */
     public HBaseDataFragmenter(InputData inConf) {
         super(inConf);
     }
 
     /**
+     * Returns statistics for HBase table. Currently it's not implemented.
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        throw new UnsupportedOperationException("ANALYZE for HBase plugin is not supported");
+    }
+
+    /**
      * Returns list of fragments containing all of the
      * HBase's table data.
      * Lookup table information with mapping between

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
index c80244a..49a4d39 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 /**
  * Analyzer class for HDFS data resources
  *
- * Given an HDFS data source (a file, directory, or wild card pattern)
- * return statistics about it (number of blocks, number of tuples, etc.)
+ * Given an HDFS data source (a file, directory, or wild card pattern) return
+ * statistics about it (number of blocks, number of tuples, etc.)
  */
 public class HdfsAnalyzer extends Analyzer {
     private JobConf jobConf;
@@ -49,28 +49,31 @@ public class HdfsAnalyzer extends Analyzer {
      * Collects a number of basic statistics based on an estimate. Statistics
      * are: number of records, number of hdfs blocks and hdfs block size.
      *
-     * @param datapath path is a data source URI that can appear as a file
-     *        name, a directory name or a wildcard pattern
+     * @param datapath path is a data source URI that can appear as a file name,
+     *            a directory name or a wildcard pattern
      * @return statistics in JSON format
-     * @throws Exception if path is wrong, its metadata cannot be retrieved
-     *                    from file system, or if scanning the first block
-     *                    using the accessor failed
+     * @throws Exception if path is wrong, its metadata cannot be retrieved from
+     *             file system, or if scanning the first block using the
+     *             accessor failed
      */
     @Override
     public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
         long blockSize = 0;
         long numberOfBlocks;
+        long dataSize = 0;
         Path path = new Path(HdfsUtilities.absoluteDataPath(datapath));
 
         ArrayList<InputSplit> splits = getSplits(path);
 
         for (InputSplit split : splits) {
             FileSplit fsp = (FileSplit) split;
-            Path filePath = fsp.getPath();
-            FileStatus fileStatus = fs.getFileStatus(filePath);
-            if (fileStatus.isFile()) {
-                blockSize = fileStatus.getBlockSize();
-                break;
+            dataSize += fsp.getLength();
+            if (blockSize == 0) {
+                Path filePath = fsp.getPath();
+                FileStatus fileStatus = fs.getFileStatus(filePath);
+                if (fileStatus.isFile()) {
+                    blockSize = fileStatus.getBlockSize();
+                }
             }
         }
 
@@ -80,23 +83,38 @@ public class HdfsAnalyzer extends Analyzer {
         }
         numberOfBlocks = splits.size();
 
-
+        /*
+         * The estimate of the number of tuples in table is based on the
+         * actual number of tuples in the first block, multiplied by its
+         * size compared to the size of the whole data to be read.
+         * The calculation:
+         * Ratio of tuples to size = number of tuples in first block / first block size.
+         * Total of tuples = ratio * number of blocks * total block size.
+         */
         long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits);
-        AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks);
+        long numberOfTuples = 0;
+        if (!splits.isEmpty()) {
+            long blockLength = splits.get(0).getLength();
+            numberOfTuples = (long) Math.floor((((double) numberOfTuplesInBlock / blockLength) * (dataSize)));
+        }
+        // AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks,
+        AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks,
+                numberOfTuples);
 
-        //print files size to log when in debug level
+        // print files size to log when in debug level
         Log.debug(AnalyzerStats.dataToString(stats, path.toString()));
 
         return stats;
     }
 
     /**
-     * Calculates the number of tuples in a split (block).
-     * Reads one block from HDFS. Exception during reading will
-     * filter upwards and handled in AnalyzerResource
+     * Calculates the number of tuples in a split (block). Reads one block from
+     * HDFS. Exception during reading will filter upwards and handled in
+     * AnalyzerResource
      */
-    private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits) throws Exception {
-        long tuples = -1; /* default  - if we are not able to read data */
+    private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits)
+            throws Exception {
+        long tuples = -1; /* default - if we are not able to read data */
         ReadAccessor accessor;
 
         if (splits.isEmpty()) {
@@ -104,8 +122,8 @@ public class HdfsAnalyzer extends Analyzer {
         }
 
         /*
-         * metadata information includes: file split's
-         * start, length and hosts (locations).
+         * metadata information includes: file split's start, length and hosts
+         * (locations).
          */
         FileSplit firstSplit = (FileSplit) splits.get(0);
         byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit);
@@ -121,6 +139,7 @@ public class HdfsAnalyzer extends Analyzer {
 
             accessor.closeForRead();
         }
+        Log.debug("number of tuples in first block: " + tuples);
 
         return tuples;
     }
@@ -133,11 +152,11 @@ public class HdfsAnalyzer extends Analyzer {
 
         // remove empty splits
         if (splits != null) {
-	        for (InputSplit split : splits) {
-	        	if (split.getLength() > 0) {
-	        		result.add(split);
-	        	}
-	        }
+            for (InputSplit split : splits) {
+                if (split.getLength() > 0) {
+                    result.add(split);
+                }
+            }
         }
 
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
index 5c81ef8..cccc75a 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
@@ -2,9 +2,11 @@ package org.apache.hawq.pxf.plugins.hdfs;
 
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.PxfInputFormat;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -12,6 +14,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -43,19 +46,11 @@ public class HdfsDataFragmenter extends Fragmenter {
     @Override
     public List<Fragment> getFragments() throws Exception {
         String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
-        InputSplit[] splits = getSplits(new Path(absoluteDataPath));
+        List<InputSplit> splits = getSplits(new Path(absoluteDataPath));
 
-        for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
+        for (InputSplit split : splits) {
             FileSplit fsp = (FileSplit) split;
 
-            /*
-             * HD-2547: If the file is empty, an empty split is returned: no
-             * locations and no length.
-             */
-            if (fsp.getLength() <= 0) {
-                continue;
-            }
-
             String filepath = fsp.getPath().toUri().getPath();
             String[] hosts = fsp.getLocations();
 
@@ -71,9 +66,40 @@ public class HdfsDataFragmenter extends Fragmenter {
         return fragments;
     }
 
-    private InputSplit[] getSplits(Path path) throws IOException {
-        PxfInputFormat format = new PxfInputFormat();
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+        ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
+
+        if (splits.isEmpty()) {
+            return new FragmentsStats(0, 0, 0);
+        }
+        long totalSize = 0;
+        for (InputSplit split: splits) {
+            totalSize += split.getLength();
+        }
+        InputSplit firstSplit = splits.get(0);
+        return new FragmentsStats(splits.size(), firstSplit.getLength(), totalSize);
+    }
+
+    private ArrayList<InputSplit> getSplits(Path path) throws IOException {
+        PxfInputFormat fformat = new PxfInputFormat();
         PxfInputFormat.setInputPaths(jobConf, path);
-        return format.getSplits(jobConf, 1);
+        InputSplit[] splits = fformat.getSplits(jobConf, 1);
+        ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+        /*
+         * HD-2547: If the file is empty, an empty split is returned: no
+         * locations and no length.
+         */
+        if (splits != null) {
+            for (InputSplit split : splits) {
+                if (split.getLength() > 0) {
+                    result.add(split);
+                }
+            }
+        }
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
index efce79f..aa8fc84 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
@@ -63,7 +63,7 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
      * Creates a OneRow object from the singleton list.
      */
     @Override
-    public OneRow setFields(List<OneField> record) throws Exception {
+    public OneRow setFields(List<OneField> record) {
         if (((byte[]) record.get(0).val).length == 0) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 881aeac..1408a78 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -24,10 +24,10 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
-
 import org.apache.hawq.pxf.api.FilterParser;
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
@@ -443,4 +443,12 @@ public class HiveDataFragmenter extends Fragmenter {
 
         return true;
     }
+
+    /**
+     * Returns statistics for Hive table. Currently it's not implemented.
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        throw new UnsupportedOperationException("ANALYZE for Hive plugin is not supported");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index 5de36b2..0c09e9b 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -1,5 +1,6 @@
 package org.apache.hawq.pxf.plugins.hive;
 
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.UserDataException;
 import org.apache.hawq.pxf.api.io.DataType;
@@ -259,4 +260,12 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
 
         return userData.getBytes();
     }
+
+    /**
+     * Returns statistics for Hive table. Currently it's not implemented.
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        throw new UnsupportedOperationException("ANALYZE for HiveRc and HiveText plugins is not supported");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
index 6784916..befa07a 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
@@ -6,12 +6,12 @@ import org.apache.hawq.pxf.service.utilities.Utilities;
 
 /*
  * Factory class for creation of Analyzer objects. The actual Analyzer object is "hidden" behind
- * an Analyzer abstract class which is returned by the AnalyzerFactory. 
+ * an Analyzer abstract class which is returned by the AnalyzerFactory.
  */
 public class AnalyzerFactory {
     static public Analyzer create(InputData inputData) throws Exception {
     	String analyzerName = inputData.getAnalyzer();
-    	
+
         return (Analyzer) Utilities.createAnyInstance(InputData.class, analyzerName, inputData);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
index 8743d87..2c9aa27 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
@@ -4,11 +4,10 @@ import org.apache.hawq.pxf.service.io.Writable;
 
 import java.io.DataInputStream;
 
-/*
- * Bridge interface - defines the interface of the Bridge classes.
- * Any Bridge class acts as an iterator over Hadoop stored data, and 
- * should implement getNext (for reading) or setNext (for writing) 
- * for handling accessed data.
+/**
+ * Bridge interface - defines the interface of the Bridge classes. Any Bridge
+ * class acts as an iterator over Hadoop stored data, and should implement
+ * getNext (for reading) or setNext (for writing) for handling accessed data.
  */
 public interface Bridge {
     boolean beginIteration() throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
index 99255fa..7e5900c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -10,10 +10,15 @@ import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
 import org.apache.hawq.pxf.service.io.Text;
 import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.lang.reflect.Array;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.hawq.pxf.api.io.DataType.TEXT;
@@ -29,9 +34,17 @@ import static org.apache.hawq.pxf.api.io.DataType.TEXT;
 public class BridgeOutputBuilder {
     private ProtocolData inputData;
     private Writable output = null;
+    private LinkedList<Writable> outputList = null;
+    private Writable partialLine = null;
     private GPDBWritable errorRecord = null;
     private int[] schema;
     private String[] colNames;
+    private boolean samplingEnabled = false;
+    private boolean isPartialLine = false;
+
+    private static final byte DELIM = 10; /* (byte)'\n'; */
+
+    private static final Log LOG = LogFactory.getLog(BridgeOutputBuilder.class);
 
     /**
      * Constructs a BridgeOutputBuilder.
@@ -41,7 +54,9 @@ public class BridgeOutputBuilder {
      */
     public BridgeOutputBuilder(ProtocolData input) {
         inputData = input;
+        outputList = new LinkedList<Writable>();
         makeErrorRecord();
+        samplingEnabled = (inputData.getStatsSampleRatio() > 0);
     }
 
     /**
@@ -87,18 +102,29 @@ public class BridgeOutputBuilder {
      * Translates recFields (obtained from the Resolver) into an output record.
      *
      * @param recFields record fields to be serialized
-     * @return Writable object with serialized row
+     * @return list of Writable objects with serialized row
      * @throws BadRecordException if building the output record failed
      */
-    public Writable makeOutput(List<OneField> recFields)
+    public LinkedList<Writable> makeOutput(List<OneField> recFields)
             throws BadRecordException {
         if (output == null && inputData.outputFormat() == OutputFormat.BINARY) {
             makeGPDBWritableOutput();
         }
 
+        outputList.clear();
+
         fillOutputRecord(recFields);
 
-        return output;
+        return outputList;
+    }
+
+    /**
+     * Returns whether or not this is a partial line.
+     *
+     * @return true for a partial line
+     */
+    public Writable getPartialLine() {
+        return partialLine;
     }
 
     /**
@@ -167,6 +193,8 @@ public class BridgeOutputBuilder {
 
             fillOneGPDBWritableField(current, i);
         }
+
+        outputList.add(output);
     }
 
     /**
@@ -201,7 +229,8 @@ public class BridgeOutputBuilder {
      * Fills a Text object based on recFields.
      *
      * @param recFields record fields
-     * @throws BadRecordException if text formatted record has more than one field
+     * @throws BadRecordException if text formatted record has more than one
+     *             field
      */
     void fillText(List<OneField> recFields) throws BadRecordException {
         /*
@@ -216,10 +245,66 @@ public class BridgeOutputBuilder {
         int type = fld.type;
         Object val = fld.val;
         if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor
-            output = new BufferWritable((byte[]) val);
+            if (samplingEnabled) {
+                convertTextDataToLines((byte[]) val);
+            } else {
+                output = new BufferWritable((byte[]) val);
+                outputList.add(output); // TODO break output into lines
+            }
         } else { // from QuotedLineBreakAccessor
             String textRec = (String) val;
             output = new Text(textRec + "\n");
+            outputList.add(output);
+        }
+    }
+
+    /**
+     * Breaks raw bytes into lines. Used only for sampling.
+     *
+     * When sampling a data source, we have to make sure that
+     * we deal with actual rows (lines) and not bigger chunks of
+     * data such as used by LineBreakAccessor for performance.
+     * The input byte array is broken into lines, each one stored in
+     * the outputList. In case the read data doesn't end with a line delimiter,
+     * which can happen when reading chunks of bytes, the partial line is
+     * stored separately, and is being completed when reading the next chunk of data.
+     *
+     * @param val input raw data to break into lines
+     */
+    void convertTextDataToLines(byte[] val) {
+        int len = val.length;
+        int start = 0;
+        int end = 0;
+        byte[] line;
+        BufferWritable writable;
+
+        while (start < len) {
+            end = ArrayUtils.indexOf(val, DELIM, start);
+            if (end == ArrayUtils.INDEX_NOT_FOUND) {
+                // data finished in the middle of the line
+                end = len;
+                isPartialLine = true;
+            } else {
+                end++; // include the DELIM character
+                isPartialLine = false;
+            }
+            line = Arrays.copyOfRange(val, start, end);
+
+            if (partialLine != null) {
+                // partial data was completed
+                ((BufferWritable) partialLine).append(line);
+                writable = (BufferWritable) partialLine;
+                partialLine = null;
+            } else {
+                writable = new BufferWritable(line);
+            }
+
+            if (isPartialLine) {
+                partialLine = writable;
+            } else {
+                outputList.add(writable);
+            }
+            start = end;
         }
     }
 
@@ -228,32 +313,33 @@ public class BridgeOutputBuilder {
      *
      * @param oneField field
      * @param colIdx column index
-     * @throws BadRecordException if field type is not supported or doesn't match the schema
+     * @throws BadRecordException if field type is not supported or doesn't
+     *             match the schema
      */
     void fillOneGPDBWritableField(OneField oneField, int colIdx)
             throws BadRecordException {
         int type = oneField.type;
         Object val = oneField.val;
-        GPDBWritable GPDBoutput = (GPDBWritable) output;
+        GPDBWritable gpdbOutput = (GPDBWritable) output;
         try {
             switch (DataType.get(type)) {
                 case INTEGER:
-                    GPDBoutput.setInt(colIdx, (Integer) val);
+                    gpdbOutput.setInt(colIdx, (Integer) val);
                     break;
                 case FLOAT8:
-                    GPDBoutput.setDouble(colIdx, (Double) val);
+                    gpdbOutput.setDouble(colIdx, (Double) val);
                     break;
                 case REAL:
-                    GPDBoutput.setFloat(colIdx, (Float) val);
+                    gpdbOutput.setFloat(colIdx, (Float) val);
                     break;
                 case BIGINT:
-                    GPDBoutput.setLong(colIdx, (Long) val);
+                    gpdbOutput.setLong(colIdx, (Long) val);
                     break;
                 case SMALLINT:
-                    GPDBoutput.setShort(colIdx, (Short) val);
+                    gpdbOutput.setShort(colIdx, (Short) val);
                     break;
                 case BOOLEAN:
-                    GPDBoutput.setBoolean(colIdx, (Boolean) val);
+                    gpdbOutput.setBoolean(colIdx, (Boolean) val);
                     break;
                 case BYTEA:
                     byte[] bts = null;
@@ -264,7 +350,7 @@ public class BridgeOutputBuilder {
                             bts[j] = Array.getByte(val, j);
                         }
                     }
-                    GPDBoutput.setBytes(colIdx, bts);
+                    gpdbOutput.setBytes(colIdx, bts);
                     break;
                 case VARCHAR:
                 case BPCHAR:
@@ -273,7 +359,8 @@ public class BridgeOutputBuilder {
                 case NUMERIC:
                 case TIMESTAMP:
                 case DATE:
-                    GPDBoutput.setString(colIdx, ObjectUtils.toString(val, null));
+                    gpdbOutput.setString(colIdx,
+                            ObjectUtils.toString(val, null));
                     break;
                 default:
                     String valClassName = (val != null) ? val.getClass().getSimpleName()

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
index 47f883c..e724467 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
@@ -39,7 +39,7 @@ public class FragmentsResponse implements StreamingOutput {
      * Serializes a fragments list in JSON,
      * To be used as the result string for HAWQ.
      * An example result is as follows:
-     * {@code {"PXFFragments":[{"replicas":["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],"sourceName":"text2.csv", "index":"0", "metadata":<base64 metadata for fragment>, "userData":"<data_specific_to_third_party_fragmenter>"},{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"],"sourceName":"text_data.csv","index":"0","metadata":<base64 metadata for fragment>,"userData":"<data_specific_to_third_party_fragmenter>"}]}}
+     * <code>{"PXFFragments":[{"replicas":["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],"sourceName":"text2.csv", "index":"0", "metadata":<base64 metadata for fragment>, "userData":"<data_specific_to_third_party_fragmenter>"},{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"],"sourceName":"text_data.csv","index":"0","metadata":<base64 metadata for fragment>,"userData":"<data_specific_to_third_party_fragmenter>"}]}</code>
      */
     @Override
     public void write(OutputStream output) throws IOException,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
index 0e9c47f..107a60c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
@@ -4,7 +4,6 @@ import org.apache.hawq.pxf.api.Fragment;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
index 9497c6c..6a93833 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
@@ -14,58 +14,81 @@ import org.apache.commons.logging.LogFactory;
 
 import java.io.*;
 import java.nio.charset.CharacterCodingException;
+import java.util.LinkedList;
 import java.util.zip.ZipException;
 
-/*
- * ReadBridge class creates appropriate accessor and resolver.
- * It will then create the correct output conversion
- * class (e.g. Text or GPDBWritable) and get records from accessor,
- * let resolver deserialize them and reserialize them using the
- * output conversion class.
- *
- * The class handles BadRecordException and other exception type
- * and marks the record as invalid for GPDB.
+/**
+ * ReadBridge class creates appropriate accessor and resolver. It will then
+ * create the correct output conversion class (e.g. Text or GPDBWritable) and
+ * get records from accessor, let resolver deserialize them and reserialize them
+ * using the output conversion class. <br>
+ * The class handles BadRecordException and other exception type and marks the
+ * record as invalid for HAWQ.
  */
 public class ReadBridge implements Bridge {
     ReadAccessor fileAccessor = null;
     ReadResolver fieldsResolver = null;
     BridgeOutputBuilder outputBuilder = null;
+    LinkedList<Writable> outputQueue = null;
 
-    private Log Log;
+    private static final Log Log = LogFactory.getLog(ReadBridge.class);
 
-    /*
-     * C'tor - set the implementation of the bridge
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing accessor and resolver names
+     * @throws Exception if accessor or resolver can't be instantiated
      */
     public ReadBridge(ProtocolData protData) throws Exception {
         outputBuilder = new BridgeOutputBuilder(protData);
-        Log = LogFactory.getLog(ReadBridge.class);
+        outputQueue = new LinkedList<Writable>();
         fileAccessor = getFileAccessor(protData);
         fieldsResolver = getFieldsResolver(protData);
     }
 
-    /*
-     * Accesses the underlying HDFS file
+    /**
+     * Accesses the underlying HDFS file.
      */
     @Override
     public boolean beginIteration() throws Exception {
         return fileAccessor.openForRead();
     }
 
-    /*
-     * Fetch next object from file and turn it into a record that the GPDB backend can process
+    /**
+     * Fetches next object from file and turn it into a record that the HAWQ
+     * backend can process.
      */
     @Override
     public Writable getNext() throws Exception {
-        Writable output;
+        Writable output = null;
         OneRow onerow = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
         try {
-            onerow = fileAccessor.readNextObject();
-            if (onerow == null) {
-                fileAccessor.closeForRead();
-                return null;
-            }
+            while (outputQueue.isEmpty()) {
+                onerow = fileAccessor.readNextObject();
+                if (onerow == null) {
+                    fileAccessor.closeForRead();
+                    output = outputBuilder.getPartialLine();
+                    if (output != null) {
+                        Log.warn("A partial record in the end of the fragment");
+                    }
+                    // if there is a partial line, return it now, otherwise it
+                    // will return null
+                    return output;
+                }
 
-            output = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                // we checked before that outputQueue is empty, so we can
+                // override it.
+                outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
         } catch (IOException ex) {
             if (!isDataException(ex)) {
                 fileAccessor.closeForRead();
@@ -78,7 +101,8 @@ public class ReadBridge implements Bridge {
                 row_info = onerow.toString();
             }
             if (ex.getCause() != null) {
-                Log.debug("BadRecordException " + ex.getCause().toString() + ": " + row_info);
+                Log.debug("BadRecordException " + ex.getCause().toString()
+                        + ": " + row_info);
             } else {
                 Log.debug(ex.toString() + ": " + row_info);
             }
@@ -91,27 +115,34 @@ public class ReadBridge implements Bridge {
         return output;
     }
 
-    public static ReadAccessor getFileAccessor(InputData inputData) throws Exception {
-        return (ReadAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+    public static ReadAccessor getFileAccessor(InputData inputData)
+            throws Exception {
+        return (ReadAccessor) Utilities.createAnyInstance(InputData.class,
+                inputData.getAccessor(), inputData);
     }
 
-    public static ReadResolver getFieldsResolver(InputData inputData) throws Exception {
-        return (ReadResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+    public static ReadResolver getFieldsResolver(InputData inputData)
+            throws Exception {
+        return (ReadResolver) Utilities.createAnyInstance(InputData.class,
+                inputData.getResolver(), inputData);
     }
 
     /*
-     * There are many exceptions that inherit IOException. Some of them like EOFException are generated
-     * due to a data problem, and not because of an IO/connection problem as the father IOException
-     * might lead us to believe. For example, an EOFException will be thrown while fetching a record
-     * from a sequence file, if there is a formatting problem in the record. Fetching record from
-     * the sequence-file is the responsibility of the accessor so the exception will be thrown from the
-     * accessor. We identify this cases by analyzing the exception type, and when we discover that the
-     * actual problem was a data problem, we return the errorOutput GPDBWritable.
+     * There are many exceptions that inherit IOException. Some of them like
+     * EOFException are generated due to a data problem, and not because of an
+     * IO/connection problem as the father IOException might lead us to believe.
+     * For example, an EOFException will be thrown while fetching a record from
+     * a sequence file, if there is a formatting problem in the record. Fetching
+     * record from the sequence-file is the responsibility of the accessor so
+     * the exception will be thrown from the accessor. We identify this cases by
+     * analyzing the exception type, and when we discover that the actual
+     * problem was a data problem, we return the errorOutput GPDBWritable.
      */
     private boolean isDataException(IOException ex) {
-        return (ex instanceof EOFException || ex instanceof CharacterCodingException ||
-                ex instanceof CharConversionException || ex instanceof UTFDataFormatException ||
-                ex instanceof ZipException);
+        return (ex instanceof EOFException
+                || ex instanceof CharacterCodingException
+                || ex instanceof CharConversionException
+                || ex instanceof UTFDataFormatException || ex instanceof ZipException);
     }
 
     @Override
@@ -121,7 +152,8 @@ public class ReadBridge implements Bridge {
 
     @Override
     public boolean isThreadSafe() {
-        boolean result = ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+        boolean result = ((Plugin) fileAccessor).isThreadSafe()
+                && ((Plugin) fieldsResolver).isThreadSafe();
         Log.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
new file mode 100644
index 0000000..66ee053
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
@@ -0,0 +1,112 @@
+package org.apache.hawq.pxf.service;
+
+import java.io.DataInputStream;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+/**
+ * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output
+ * records, based on a ratio sample. The sample to pass or discard a record is
+ * done after all of the processing is completed (
+ * {@code accessor -> resolver -> output builder}) to make sure there are no
+ * chunks of data instead of single records. <br>
+ * The goal is to get as uniform as possible sampling. This is achieved by
+ * creating a bit map matching the precision of the sampleRatio, so that for a
+ * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be
+ * set. This map is matched against each read record, discarding ones with a 0
+ * bit and continuing until a 1 bit record is read.
+ */
+public class ReadSamplingBridge implements Bridge {
+
+    ReadBridge bridge;
+
+    float sampleRatio;
+    BitSet sampleBitSet;
+    int bitSetSize;
+    int sampleSize;
+    int curIndex;
+
+    static private Log Log = LogFactory.getLog(ReadSamplingBridge.class);;
+
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing sampling ratio
+     * @throws Exception if the sampling ratio is wrong
+     */
+    public ReadSamplingBridge(ProtocolData protData) throws Exception {
+        bridge = new ReadBridge(protData);
+
+        this.sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio < 0.0001 || sampleRatio > 1.0) {
+            throw new IllegalArgumentException(
+                    "sampling ratio must be a value between 0.0001 and 1.0. "
+                            + "(value = " + sampleRatio + ")");
+        }
+
+        calculateBitSetSize();
+
+        this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize,
+                sampleSize);
+        this.curIndex = 0;
+    }
+
+    private void calculateBitSetSize() {
+
+        sampleSize = (int) (sampleRatio * 10000);
+        bitSetSize = 10000;
+
+        while ((bitSetSize > 100) && (sampleSize % 10 == 0)) {
+            bitSetSize /= 10;
+            sampleSize /= 10;
+        }
+        Log.debug("bit set size = " + bitSetSize + " sample size = "
+                + sampleSize);
+    }
+
+    /**
+     * Fetches next sample, according to the sampling ratio.
+     */
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = bridge.getNext();
+
+        // sample - if bit is false, advance to the next object
+        while (!sampleBitSet.get(curIndex)) {
+
+            if (output == null) {
+                break;
+            }
+            incIndex();
+            output = bridge.getNext();
+        }
+
+        incIndex();
+        return output;
+    }
+
+    private void incIndex() {
+        curIndex = (++curIndex) % bitSetSize;
+    }
+
+    @Override
+    public boolean beginIteration() throws Exception {
+        return bridge.beginIteration();
+    }
+
+    @Override
+    public boolean setNext(DataInputStream inputStream) throws Exception {
+        return bridge.setNext(inputStream);
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return bridge.isThreadSafe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
index 34ed316..c96c17a 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
@@ -27,17 +27,18 @@ public class WriteBridge implements Bridge {
      * C'tor - set the implementation of the bridge
      */
     public WriteBridge(ProtocolData protocolData) throws Exception {
-    	
-        inputBuilder = new BridgeInputBuilder(protocolData);        
-        /* plugins accept InputData paramaters */
+
+        inputBuilder = new BridgeInputBuilder(protocolData);
+        /* plugins accept InputData parameters */
         fileAccessor = getFileAccessor(protocolData);
         fieldsResolver = getFieldsResolver(protocolData);
-        
+
     }
 
     /*
      * Accesses the underlying HDFS file
      */
+    @Override
     public boolean beginIteration() throws Exception {
         return fileAccessor.openForWrite();
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
index e74c88b..afe7917 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
@@ -54,4 +54,25 @@ public class BufferWritable implements Writable {
                 "BufferWritable.readFields() is not implemented");
     }
 
+    /**
+     * Appends given app's buffer to existing buffer.
+     * <br>
+     * Not efficient - requires copying both this and the appended buffer.
+     *
+     * @param app buffer to append
+     */
+    public void append(byte[] app) {
+        if (buf == null) {
+            buf = app;
+            return;
+        }
+        if (app == null) {
+            return;
+        }
+
+        byte[] newbuf = new byte[buf.length + app.length];
+        System.arraycopy(buf, 0, newbuf, 0, buf.length);
+        System.arraycopy(app, 0, newbuf, buf.length, app.length);
+        buf = newbuf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 588845d..cda8317 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.hawq.pxf.service.Bridge;
 import org.apache.hawq.pxf.service.ReadBridge;
+import org.apache.hawq.pxf.service.ReadSamplingBridge;
 import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
@@ -36,28 +37,32 @@ public class BridgeResource extends RestResource {
 
     private static Log Log = LogFactory.getLog(BridgeResource.class);
     /**
-     * Lock is needed here in the case of a non-thread-safe plugin.
-     * Using synchronized methods is not enough because the bridge work
-     * is called by jetty ({@link StreamingOutput}), after we are getting
-     * out of this class's context.
+     * Lock is needed here in the case of a non-thread-safe plugin. Using
+     * synchronized methods is not enough because the bridge work is called by
+     * jetty ({@link StreamingOutput}), after we are getting out of this class's
+     * context.
      * <p/>
-     * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on the
-     * isThreadSafe parameter that is determined by the bridge.
+     * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on
+     * the isThreadSafe parameter that is determined by the bridge.
      */
     private static final ReentrantLock BRIDGE_LOCK = new ReentrantLock();
 
     public BridgeResource() {
     }
 
-    /*
-     * Used to be HDFSReader. Creates a bridge instance and iterates over
-     * its records, printing it out to outgoing stream.
-     * Outputs GPDBWritable.
+    /**
+     * Used to be HDFSReader. Creates a bridge instance and iterates over its
+     * records, printing it out to outgoing stream. Outputs GPDBWritable or
+     * Text.
      *
      * Parameters come through HTTP header.
      *
-     * @param servletContext Servlet context contains attributes required by SecuredHDFS
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
      * @param headers Holds HTTP headers from request
+     * @return response object containing stream that will output records
+     * @throws Exception in case of wrong request parameters, or failure to
+     *             initialize bridge
      */
     @GET
     @Produces(MediaType.APPLICATION_OCTET_STREAM)
@@ -70,17 +75,24 @@ public class BridgeResource extends RestResource {
 
         ProtocolData protData = new ProtocolData(params);
         SecuredHDFS.verifyToken(protData, servletContext);
-        Bridge bridge = new ReadBridge(protData);
+        Bridge bridge;
+        float sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio > 0) {
+            bridge = new ReadSamplingBridge(protData);
+        } else {
+            bridge = new ReadBridge(protData);
+        }
         String dataDir = protData.getDataSource();
         // THREAD-SAFE parameter has precedence
         boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe();
-        Log.debug("Request for " + dataDir + " will be handled " +
-                (isThreadSafe ? "without" : "with") + " synchronization");
+        Log.debug("Request for " + dataDir + " will be handled "
+                + (isThreadSafe ? "without" : "with") + " synchronization");
 
         return readResponse(bridge, protData, isThreadSafe);
     }
 
-    Response readResponse(final Bridge bridge, ProtocolData protData, final boolean threadSafe) throws Exception {
+    Response readResponse(final Bridge bridge, ProtocolData protData,
+                          final boolean threadSafe) {
         final int fragment = protData.getDataFragment();
         final String dataDir = protData.getDataSource();
 
@@ -89,7 +101,8 @@ public class BridgeResource extends RestResource {
         // output stream
         final StreamingOutput streaming = new StreamingOutput() {
             @Override
-            public void write(final OutputStream out) throws IOException, WebApplicationException {
+            public void write(final OutputStream out) throws IOException,
+                    WebApplicationException {
                 long recordCount = 0;
 
                 if (!threadSafe) {
@@ -103,20 +116,26 @@ public class BridgeResource extends RestResource {
 
                     Writable record;
                     DataOutputStream dos = new DataOutputStream(out);
-                    Log.debug("Starting streaming fragment " + fragment + " of resource " + dataDir);
+                    Log.debug("Starting streaming fragment " + fragment
+                            + " of resource " + dataDir);
                     while ((record = bridge.getNext()) != null) {
-						record.write(dos);
+                        record.write(dos);
                         ++recordCount;
                     }
-                    Log.debug("Finished streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records.");
+                    Log.debug("Finished streaming fragment " + fragment
+                            + " of resource " + dataDir + ", " + recordCount
+                            + " records.");
                 } catch (ClientAbortException e) {
-                    // Occurs whenever client (HAWQ) decides the end the connection
+                    // Occurs whenever client (HAWQ) decides the end the
+                    // connection
                     Log.error("Remote connection closed by HAWQ", e);
                 } catch (Exception e) {
                     Log.error("Exception thrown when streaming", e);
                     throw new IOException(e.getMessage());
                 } finally {
-                    Log.debug("Stopped streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records.");
+                    Log.debug("Stopped streaming fragment " + fragment
+                            + " of resource " + dataDir + ", " + recordCount
+                            + " records.");
                     if (!threadSafe) {
                         unlock(dataDir);
                     }
@@ -128,7 +147,7 @@ public class BridgeResource extends RestResource {
     }
 
     /**
-     * Lock BRIDGE_LOCK
+     * Locks BRIDGE_LOCK
      *
      * @param path path for the request, used for logging.
      */
@@ -139,7 +158,7 @@ public class BridgeResource extends RestResource {
     }
 
     /**
-     * Unlock BRIDGE_LOCK
+     * Unlocks BRIDGE_LOCK
      *
      * @param path path for the request, used for logging.
      */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
index 258f8c2..6f77813 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
@@ -2,9 +2,11 @@ package org.apache.hawq.pxf.service.rest;
 
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.service.FragmenterFactory;
 import org.apache.hawq.pxf.service.FragmentsResponse;
 import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
 
@@ -21,44 +23,99 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-/*
- * Class enhances the API of the WEBHDFS REST server.
- * Returns the data fragments that a data resource is made of, enabling parallel processing of the data resource.
- * Example for querying API FRAGMENTER from a web client
- * curl -i "http://localhost:50070/pxf/v2/Fragmenter/getFragments?path=/dir1/dir2/*txt"
- * /pxf/ is made part of the path when there is a webapp by that name in tcServer.
+/**
+ * Class enhances the API of the WEBHDFS REST server. Returns the data fragments
+ * that a data resource is made of, enabling parallel processing of the data
+ * resource. Example for querying API FRAGMENTER from a web client
+ * {@code curl -i "http://localhost:50070/pxf/v2/Fragmenter/getFragments?path=/dir1/dir2/*txt"}
+ * <code>/pxf/</code> is made part of the path when there is a webapp by that
+ * name in tomcat.
  */
 @Path("/" + Version.PXF_PROTOCOL_VERSION + "/Fragmenter/")
 public class FragmenterResource extends RestResource {
-    private Log Log;
+    private static Log Log = LogFactory.getLog(FragmenterResource.class);
 
-    public FragmenterResource() throws IOException {
-        Log = LogFactory.getLog(FragmenterResource.class);
-    }
-
-    /*
-     * The function is called when http://nn:port/pxf/vx/Fragmenter/getFragments?path=...
-     * is used
+    /**
+     * The function is called when
+     * {@code http://nn:port/pxf/vx/Fragmenter/getFragments?path=...} is used.
      *
-     * @param servletContext Servlet context contains attributes required by SecuredHDFS
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
      * @param headers Holds HTTP headers from request
      * @param path Holds URI path option used in this request
+     * @return response object with JSON serialized fragments metadata
+     * @throws Exception if getting fragments info failed
      */
     @GET
     @Path("getFragments")
     @Produces("application/json")
     public Response getFragments(@Context final ServletContext servletContext,
-            @Context final HttpHeaders headers,
-            @QueryParam("path") final String path) throws Exception {
+                                 @Context final HttpHeaders headers,
+                                 @QueryParam("path") final String path)
+            throws Exception {
+
+        ProtocolData protData = getProtocolData(servletContext, headers, path);
+
+        /* Create a fragmenter instance with API level parameters */
+        final Fragmenter fragmenter = FragmenterFactory.create(protData);
+
+        List<Fragment> fragments = fragmenter.getFragments();
+
+        fragments = AnalyzeUtils.getSampleFragments(fragments, protData);
+
+        FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(
+                fragments, path);
+
+        return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build();
+    }
+
+    /**
+     * The function is called when
+     * {@code http://nn:port/pxf/vx/Fragmenter/getFragmentsStats?path=...} is
+     * used.
+     *
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
+     * @param headers Holds HTTP headers from request
+     * @param path Holds URI path option used in this request
+     * @return response object with JSON serialized fragments statistics
+     * @throws Exception if getting fragments info failed
+     */
+    @GET
+    @Path("getFragmentsStats")
+    @Produces("application/json")
+    public Response getFragmentsStats(@Context final ServletContext servletContext,
+                                      @Context final HttpHeaders headers,
+                                      @QueryParam("path") final String path)
+            throws Exception {
+
+        ProtocolData protData = getProtocolData(servletContext, headers, path);
+
+        /* Create a fragmenter instance with API level parameters */
+        final Fragmenter fragmenter = FragmenterFactory.create(protData);
+
+        FragmentsStats fragmentsStats = fragmenter.getFragmentsStats();
+        String response = FragmentsStats.dataToJSON(fragmentsStats);
+        if (Log.isDebugEnabled()) {
+            Log.debug(FragmentsStats.dataToString(fragmentsStats, path));
+        }
+
+        return Response.ok(response, MediaType.APPLICATION_JSON_TYPE).build();
+    }
+
+    private ProtocolData getProtocolData(final ServletContext servletContext,
+                                         final HttpHeaders headers,
+                                         final String path) throws Exception {
 
         if (Log.isDebugEnabled()) {
-            StringBuilder startMsg = new StringBuilder("FRAGMENTER started for path \"" + path + "\"");
+            StringBuilder startMsg = new StringBuilder(
+                    "FRAGMENTER started for path \"" + path + "\"");
             for (String header : headers.getRequestHeaders().keySet()) {
-                startMsg.append(" Header: ").append(header).append(" Value: ").append(headers.getRequestHeader(header));
+                startMsg.append(" Header: ").append(header).append(" Value: ").append(
+                        headers.getRequestHeader(header));
             }
             Log.debug(startMsg);
         }
@@ -73,12 +130,6 @@ public class FragmenterResource extends RestResource {
         }
         SecuredHDFS.verifyToken(protData, servletContext);
 
-        /* Create a fragmenter instance with API level parameters */
-        final Fragmenter fragmenter = FragmenterFactory.create(protData);
-
-        List<Fragment> fragments = fragmenter.getFragments();
-        FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(fragments, path);
-
-        return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build();
+        return protData;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
index 2316bc7..c1dadd1 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
@@ -12,11 +12,9 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
-import java.io.IOException;
-
 
 class Version {
-    final static String PXF_PROTOCOL_VERSION = "v13";
+    final static String PXF_PROTOCOL_VERSION = "v14";
 }
 
 /**
@@ -38,7 +36,7 @@ public class InvalidPathResource {
 
     private Log Log;
 
-    public InvalidPathResource() throws IOException {
+    public InvalidPathResource() {
         super();
         Log = LogFactory.getLog(InvalidPathResource.class);
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java
new file mode 100644
index 0000000..7e0fcf1
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java
@@ -0,0 +1,128 @@
+package org.apache.hawq.pxf.service.utilities;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.api.Fragment;
+
+/**
+ * Helper class to get statistics for ANALYZE.
+ */
+public class AnalyzeUtils {
+
+    private static Log Log = LogFactory.getLog(AnalyzeUtils.class);
+
+    /**
+     * In case pxf_max_fragments parameter is declared, make sure not to get
+     * over the limit. The returned fragments are evenly distributed, in order
+     * to achieve good sampling.
+     *
+     * @param fragments fragments list
+     * @param protData container for parameters, including sampling data.
+     * @return a list of fragments no bigger than pxf_max_fragments parameter.
+     */
+    static public List<Fragment> getSampleFragments(List<Fragment> fragments,
+                                                    ProtocolData protData) {
+
+        int listSize = fragments.size();
+        int maxSize = protData.getStatsMaxFragments();
+        List<Fragment> samplingList = new ArrayList<Fragment>();
+        BitSet bitSet;
+
+        if (maxSize == 0) {
+            return fragments;
+        }
+
+        Log.debug("fragments list has " + listSize
+                + " fragments, maxFragments = " + maxSize);
+
+        bitSet = generateSamplingBitSet(listSize, maxSize);
+
+        for (int i = 0; i < listSize; ++i) {
+            if (bitSet.get(i)) {
+                samplingList.add(fragments.get(i));
+            }
+        }
+
+        return samplingList;
+    }
+
+    /**
+     * Marks sampleSize bits out of the poolSize, in a uniform way.
+     *
+     * @param poolSize pool size
+     * @param sampleSize sample size
+     * @return bit set with sampleSize bits set out of poolSize.
+     */
+    static public BitSet generateSamplingBitSet(int poolSize, int sampleSize) {
+
+        int skip = 0, chosen = 0, curIndex = 0;
+        BitSet bitSet = new BitSet();
+
+        if (poolSize <= 0 || sampleSize <= 0) {
+            return bitSet;
+        }
+
+        if (sampleSize >= poolSize) {
+            Log.debug("sampling bit map has " + poolSize + " elements (100%)");
+            bitSet.set(0, poolSize);
+            return bitSet;
+        }
+
+        skip = (poolSize / sampleSize) + 1;
+
+        while (chosen < sampleSize) {
+
+            bitSet.set(curIndex);
+            chosen++;
+            if (chosen == sampleSize) {
+                break;
+            }
+
+            for (int i = 0; i < skip; ++i) {
+                curIndex = nextClearBitModulo((++curIndex) % poolSize,
+                        poolSize, bitSet);
+                if (curIndex == -1) {
+                    // should never happen
+                    throw new IllegalArgumentException(
+                            "Trying to sample more than pool size "
+                                    + "(pool size " + poolSize
+                                    + ", sampling size " + sampleSize);
+                }
+            }
+        }
+
+        Log.debug("sampling bit map has " + chosen + " elements:"
+                + bitSet.toString());
+
+        return bitSet;
+    }
+
+    /**
+     * Returns index of next clear (false) bit, starting from and including
+     * index. If all bits from index to the end are set (true), search from the
+     * beginning. Return -1 if all bits are set (true).
+     *
+     * @param index starting point
+     * @param poolSize the bit set size
+     * @param bitSet bitset to search
+     * @return index of next clear bit, starting in index
+     */
+    static private int nextClearBitModulo(int index, int poolSize, BitSet bitSet) {
+
+        int indexToSet = bitSet.nextClearBit(index);
+        if (indexToSet == poolSize && index != 0) {
+            indexToSet = bitSet.nextClearBit(0);
+        }
+        /* means that all bits are already set, so we return -1 */
+        if (indexToSet == poolSize) {
+            return -1;
+        }
+
+        return indexToSet;
+    }
+}