You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2016/08/26 23:05:46 UTC
[3/6] incubator-hawq git commit: HAWQ-992. PXF Hive data type check
in Fragmenter too restrictive.
HAWQ-992. PXF Hive data type check in Fragmenter too restrictive.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e2416f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e2416f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e2416f49
Branch: refs/heads/HAWQ-992
Commit: e2416f498ebb2be29712a1042c4e9bae99f523ff
Parents: 24f5e36
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Fri Aug 26 16:04:53 2016 -0700
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Fri Aug 26 16:04:53 2016 -0700
----------------------------------------------------------------------
.../hawq/pxf/api/utilities/EnumHawqType.java | 28 +-
.../plugins/hive/HiveColumnarSerdeResolver.java | 2 +-
.../plugins/hive/HiveInputFormatFragmenter.java | 4 +-
.../hive/utilities/EnumHiveToHawqType.java | 61 +-
.../plugins/hive/utilities/HiveUtilities.java | 45 +-
.../hive/utilities/HiveUtilitiesTest.java | 65 ++
.../pxf/service/utilities/ProtocolData.java | 5 +-
.../org/apache/hawq/pxf/service/Bridge.java | 40 +
.../hawq/pxf/service/BridgeInputBuilder.java | 71 ++
.../hawq/pxf/service/BridgeOutputBuilder.java | 394 ++++++++
.../hawq/pxf/service/FragmenterFactory.java | 37 +
.../hawq/pxf/service/FragmentsResponse.java | 89 ++
.../pxf/service/FragmentsResponseFormatter.java | 157 ++++
.../hawq/pxf/service/GPDBWritableMapper.java | 135 +++
.../pxf/service/MetadataFetcherFactory.java | 36 +
.../hawq/pxf/service/MetadataResponse.java | 93 ++
.../pxf/service/MetadataResponseFormatter.java | 95 ++
.../org/apache/hawq/pxf/service/ReadBridge.java | 179 ++++
.../hawq/pxf/service/ReadSamplingBridge.java | 131 +++
.../apache/hawq/pxf/service/WriteBridge.java | 117 +++
.../hawq/pxf/service/io/BufferWritable.java | 98 ++
.../hawq/pxf/service/io/GPDBWritable.java | 893 +++++++++++++++++++
.../org/apache/hawq/pxf/service/io/Text.java | 399 +++++++++
.../apache/hawq/pxf/service/io/Writable.java | 50 ++
.../apache/hawq/pxf/service/package-info.java | 23 +
.../hawq/pxf/service/rest/BridgeResource.java | 189 ++++
.../pxf/service/rest/ClusterNodesResource.java | 148 +++
.../pxf/service/rest/FragmenterResource.java | 154 ++++
.../pxf/service/rest/InvalidPathResource.java | 179 ++++
.../hawq/pxf/service/rest/MetadataResource.java | 124 +++
.../hawq/pxf/service/rest/RestResource.java | 71 ++
.../service/rest/ServletLifecycleListener.java | 63 ++
.../hawq/pxf/service/rest/VersionResource.java | 88 ++
.../hawq/pxf/service/rest/WritableResource.java | 174 ++++
.../pxf/service/utilities/AnalyzeUtils.java | 147 +++
.../service/utilities/CustomWebappLoader.java | 231 +++++
.../pxf/service/utilities/Log4jConfigure.java | 66 ++
.../pxf/service/utilities/ProtocolData.java | 491 ++++++++++
.../hawq/pxf/service/utilities/SecureLogin.java | 61 ++
.../hawq/pxf/service/utilities/SecuredHDFS.java | 114 +++
40 files changed, 5509 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
index 01d40f0..f35fa5e 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
@@ -56,23 +56,27 @@ public enum EnumHawqType {
DateType("date", DataType.DATE),
TimestampType("timestamp", DataType.TIMESTAMP),
BoolType("bool", DataType.BOOLEAN),
- NumericType("numeric", DataType.NUMERIC, (byte) 2, true),
+ NumericType("numeric", DataType.NUMERIC, (byte) 2, false),
BpcharType("bpchar", DataType.BPCHAR, (byte) 1, true);
private DataType dataType;
private String typeName;
private byte modifiersNum;
- private boolean validateIntegerModifiers;
+ private boolean mandatoryModifiers;
EnumHawqType(String typeName, DataType dataType) {
this.typeName = typeName;
this.dataType = dataType;
}
- EnumHawqType(String typeName, DataType dataType, byte modifiersNum, boolean validateIntegerModifiers) {
+ EnumHawqType(String typeName, DataType dataType, byte modifiersNum) {
this(typeName, dataType);
this.modifiersNum = modifiersNum;
- this.validateIntegerModifiers = validateIntegerModifiers;
+ }
+
+ EnumHawqType(String typeName, DataType dataType, byte modifiersNum, boolean mandatoryModifiers) {
+ this(typeName, dataType, modifiersNum);
+ this.setMandatoryModifiers(mandatoryModifiers);
}
/**
@@ -93,19 +97,19 @@ public enum EnumHawqType {
/**
*
- * @return whether modifiers should be integers
- */
- public boolean getValidateIntegerModifiers() {
- return this.validateIntegerModifiers;
- }
-
- /**
- *
* @return data type
*/
public DataType getDataType() {
return this.dataType;
}
+
+ public boolean isMandatoryModifiers() {
+ return mandatoryModifiers;
+ }
+
+ public void setMandatoryModifiers(boolean mandatoryModifiers) {
+ this.mandatoryModifiers = mandatoryModifiers;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 43e3b65..606ddc6 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -127,7 +127,7 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
for (int i = 0; i < numberOfDataColumns; i++) {
ColumnDescriptor column = input.getColumn(i);
String columnName = column.columnName();
- String columnType = HiveUtilities.toHiveType(DataType.get(column.columnTypeCode()));
+ String columnType = HiveUtilities.toCompatibleHiveType(DataType.get(column.columnTypeCode()));
columnNames.append(delim).append(columnName);
columnTypes.append(delim).append(columnType);
delim = ",";
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index b944206..ccc8fa7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -148,14 +148,14 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
for (FieldSchema hiveCol : hiveColumns) {
ColumnDescriptor colDesc = inputData.getColumn(index++);
DataType colType = DataType.get(colDesc.columnTypeCode());
- HiveUtilities.compareTypes(colType, hiveCol.getType(), colDesc.columnName());
+ HiveUtilities.validateTypeCompatible(colType, colDesc.columnTypeModifiers(), hiveCol.getType(), colDesc.columnName());
}
// check partition fields
List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
for (FieldSchema hivePart : hivePartitions) {
ColumnDescriptor colDesc = inputData.getColumn(index++);
DataType colType = DataType.get(colDesc.columnTypeCode());
- HiveUtilities.compareTypes(colType, hivePart.getType(), colDesc.columnName());
+ HiveUtilities.validateTypeCompatible(colType, colDesc.columnTypeModifiers(), hivePart.getType(), colDesc.columnName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
index 1cedaa8..9b24642 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
@@ -19,6 +19,11 @@
package org.apache.hawq.pxf.plugins.hive.utilities;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
@@ -30,8 +35,8 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException;
*/
public enum EnumHiveToHawqType {
- TinyintType("tinyint", EnumHawqType.Int2Type),
- SmallintType("smallint", EnumHawqType.Int2Type),
+ TinyintType("tinyint", EnumHawqType.Int2Type, (byte) 1),
+ SmallintType("smallint", EnumHawqType.Int2Type, (byte) 2),
IntType("int", EnumHawqType.Int4Type),
BigintType("bigint", EnumHawqType.Int8Type),
BooleanType("boolean", EnumHawqType.BoolType),
@@ -52,11 +57,17 @@ public enum EnumHiveToHawqType {
private String typeName;
private EnumHawqType hawqType;
private String splitExpression;
+ private byte size;
EnumHiveToHawqType(String typeName, EnumHawqType hawqType) {
this.typeName = typeName;
this.hawqType = hawqType;
}
+
+ EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) {
+ this(typeName, hawqType);
+ this.setSize(size);
+ }
EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) {
this(typeName, hawqType);
@@ -111,15 +122,51 @@ public enum EnumHiveToHawqType {
+ hiveType + " to HAWQ's type");
}
- public static EnumHiveToHawqType getHawqToHiveType(DataType dataType) {
+ public static EnumHiveToHawqType getCompatibleHawqToHiveType(DataType dataType) {
- for (EnumHiveToHawqType t : values()) {
+ SortedSet<EnumHiveToHawqType> types = new TreeSet<EnumHiveToHawqType>(new Comparator<EnumHiveToHawqType>() {
+ public int compare(EnumHiveToHawqType a, EnumHiveToHawqType b){
+ return Byte.compare(a.getSize(), b.getSize());
+ }
+ });
+
+ for (EnumHiveToHawqType t : values()) {
if (t.getHawqType().getDataType().equals(dataType)) {
- return t;
+ types.add(t);
+ }
+ }
+
+ if (types.size() == 0)
+ throw new UnsupportedTypeException("Unable to map HAWQ's type: "
+ + dataType + " to Hive's type");
+
+ return types.last();
+ }
+
+ public static String[] extractModifiers(String hiveType) {
+ String[] result = null;
+ for (EnumHiveToHawqType t : values()) {
+ String hiveTypeName = hiveType;
+ String splitExpression = t.getSplitExpression();
+ if (splitExpression != null) {
+ String[] tokens = hiveType.split(splitExpression);
+ hiveTypeName = tokens[0];
+ result = Arrays.copyOfRange(tokens, 1, tokens.length);
+ }
+ if (t.getTypeName().toLowerCase().equals(hiveTypeName.toLowerCase())) {
+ return result;
}
}
- throw new UnsupportedTypeException("Unable to map HAWQ's type: "
- + dataType + " to Hive's type");
+ throw new UnsupportedTypeException("Unable to map Hive's type: "
+ + hiveType + " to HAWQ's type");
+ }
+
+ public byte getSize() {
+ return size;
+ }
+
+ public void setSize(byte size) {
+ this.size = size;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index 579ab0b..6bda9b7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -139,7 +139,7 @@ public class HiveUtilities {
+ ", actual number of modifiers: "
+ modifiers.length);
}
- if (hawqType.getValidateIntegerModifiers() && !verifyIntegerModifiers(modifiers)) {
+ if (!verifyIntegerModifiers(modifiers)) {
throw new UnsupportedTypeException("HAWQ does not support type " + hiveType + " (Field " + fieldName + "), modifiers should be integers");
}
}
@@ -279,24 +279,41 @@ public class HiveUtilities {
* @return Hive type
* @throws UnsupportedTypeException if type is not supported
*/
- public static String toHiveType(DataType type) {
+ public static String toCompatibleHiveType(DataType type) {
- EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getHawqToHiveType(type);
+ EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getCompatibleHawqToHiveType(type);
return hiveToHawqType.getTypeName();
}
- public static void compareTypes(DataType type, String hiveType, String columnName) {
- String convertedHive = toHiveType(type);
- if (!convertedHive.equals(hiveType)
- && !(convertedHive.equals("smallint") && hiveType.equals("tinyint"))) {
- throw new UnsupportedTypeException(
- "Schema mismatch definition:"
- + " (Hive type " + hiveType + ", HAWQ type "
- + type.toString() + ")");
+
+
+ public static void validateTypeCompatible(DataType hawqDataType, String[] hawqTypeMods, String hiveType, String hawqColumnName) {
+
+ EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getHiveToHawqType(hiveType);
+ EnumHawqType expectedHawqType = hiveToHawqType.getHawqType();
+
+ if ((hawqTypeMods == null || hawqTypeMods.length == 0) && expectedHawqType.isMandatoryModifiers())
+ throw new UnsupportedTypeException("Invalid definition for column " + hawqColumnName + ": modifiers are mandatory for type " + expectedHawqType.getTypeName());
+
+ switch (hawqDataType) {
+ case NUMERIC:
+ String[] hiveTypeModifiers = EnumHiveToHawqType.extractModifiers(hiveType);
+ for (int i = 0; hawqTypeMods != null && i < hawqTypeMods.length; i++) {
+ if (Integer.valueOf(hawqTypeMods[i]) < Integer
+ .valueOf(hiveTypeModifiers[i]))
+ throw new UnsupportedTypeException(
+ "Invalid definition for column " + hawqColumnName
+ + ": modifiers are not compatible, "
+ + Arrays.toString(hiveTypeModifiers) + ", "
+ + Arrays.toString(hawqTypeMods));
+ }
+ break;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(" Hive type " + hiveType
- + ", HAWQ type " + type.toString());
+
+ if (!hiveToHawqType.getHawqType().equals(expectedHawqType)) {
+ throw new UnsupportedTypeException("Invalid definition for column " + hawqColumnName
+ + ": expected HAWQ type " + expectedHawqType.getTypeName() +
+ ", actual HAWQ type " + hiveToHawqType.getHawqType().getTypeName() + ")");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
index e9b024a..e94351a 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
@@ -22,11 +22,16 @@ package org.apache.hawq.pxf.plugins.hive.utilities;
import static org.junit.Assert.*;
+import java.util.Arrays;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.EnumHawqType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.junit.Test;
import org.apache.hawq.pxf.api.Metadata;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.plugins.hive.utilities.EnumHiveToHawqType;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
public class HiveUtilitiesTest {
@@ -121,6 +126,66 @@ public class HiveUtilitiesTest {
}
@Test
+ public void testCompatibleHiveType() {
+ String compatibleTypeName = HiveUtilities.toCompatibleHiveType(DataType.SMALLINT);
+ assertEquals(compatibleTypeName, EnumHiveToHawqType.SmallintType.getTypeName());
+ }
+
+ @Test
+ public void validateSchema() throws Exception {
+ String columnName = "abc";
+
+ String[] hawqModifiers = {};
+ HiveUtilities.validateTypeCompatible(DataType.SMALLINT, hawqModifiers, EnumHiveToHawqType.TinyintType.getTypeName(), columnName);
+
+ HiveUtilities.validateTypeCompatible(DataType.SMALLINT, hawqModifiers, EnumHiveToHawqType.SmallintType.getTypeName(), columnName);
+
+ //Both Hive and HAWQ types have the same modifiers
+ hawqModifiers = new String[]{"38", "18"};
+ HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+
+ //HAWQ datatype doesn't require modifiers, they are empty, Hive has non-empty modifiers
+ //Types are compatible in this case
+ hawqModifiers = new String[]{};
+ HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+ hawqModifiers = null;
+ HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+
+ //HAWQ datatype requires modifiers but they aren't provided
+ //Types aren't compatible
+ try {
+ hawqModifiers = new String[]{};
+ HiveUtilities.validateTypeCompatible(DataType.VARCHAR, hawqModifiers, "varchar", columnName);
+ fail("should fail with incompatible modifiers message");
+ }
+ catch (UnsupportedTypeException e) {
+ String errorMsg = "Invalid definition for column " + columnName + ": modifiers are mandatory for type " + EnumHawqType.VarcharType.getTypeName();
+ assertEquals(errorMsg, e.getMessage());
+ }
+
+
+ //HAWQ has lesser modifiers than Hive, types aren't compatible
+ try {
+ hawqModifiers = new String[]{"38", "17"};
+ HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+ fail("should fail with incompatible modifiers message");
+ }
+ catch (UnsupportedTypeException e) {
+ String errorMsg = "Invalid definition for column " + columnName
+ + ": modifiers are not compatible, "
+ + Arrays.toString(new String[]{"38", "18"}) + ", "
+ + Arrays.toString(new String[]{"38", "17"});
+ assertEquals(errorMsg, e.getMessage());
+ }
+ }
+
+ @Test
+ public void extractModifiers() throws Exception {
+ String[] mods = EnumHiveToHawqType.extractModifiers("decimal(10,2)");
+ assertEquals(mods, new String[]{"10", "2"});
+ }
+
+ @Test
public void mapHiveTypeWithModifiersNegative() throws Exception {
String badHiveType = "decimal(2)";
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 5e6f6c4..2838232 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -402,9 +402,10 @@ public class ProtocolData extends InputData {
}
private String[] parseTypeMods(int columnIndex) {
- Integer typeModeCount = Integer.parseInt(getOptionalProperty("ATTR-TYPEMOD" + columnIndex + "COUNT"));
+ String typeModeCountStr = getOptionalProperty("ATTR-TYPEMOD" + columnIndex + "COUNT");
String[] result = null;
- if (typeModeCount > 0) {
+ if (typeModeCountStr != null) {
+ Integer typeModeCount = Integer.parseInt(typeModeCountStr);
result = new String[typeModeCount];
for (int i = 0; i < typeModeCount; i++) {
result[i] = getProperty("ATTR-TYPEMOD" + columnIndex + "-" + i);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java
new file mode 100644
index 0000000..bfd862a
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java
@@ -0,0 +1,40 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.service.io.Writable;
+
+import java.io.DataInputStream;
+
+/**
+ * Bridge interface - defines the interface of the Bridge classes. Any Bridge
+ * class acts as an iterator over Hadoop stored data, and should implement
+ * getNext (for reading) or setNext (for writing) for handling accessed data.
+ */
+public interface Bridge {
+ boolean beginIteration() throws Exception;
+
+ Writable getNext() throws Exception;
+
+ boolean setNext(DataInputStream inputStream) throws Exception;
+
+ boolean isThreadSafe();
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java
new file mode 100644
index 0000000..4b4d2e8
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java
@@ -0,0 +1,71 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInput;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class BridgeInputBuilder {
+ private ProtocolData protocolData;
+ private static final Log LOG = LogFactory.getLog(BridgeInputBuilder.class);
+
+ public BridgeInputBuilder(ProtocolData protocolData) throws Exception {
+ this.protocolData = protocolData;
+ }
+
+ public List<OneField> makeInput(DataInput inputStream) throws Exception {
+ if (protocolData.outputFormat() == OutputFormat.TEXT) {
+ Text txt = new Text();
+ txt.readFields(inputStream);
+ return Collections.singletonList(new OneField(DataType.BYTEA.getOID(), txt.getBytes()));
+ }
+
+ GPDBWritable gpdbWritable = new GPDBWritable();
+ gpdbWritable.readFields(inputStream);
+
+ if (gpdbWritable.isEmpty()) {
+ LOG.debug("Reached end of stream");
+ return null;
+ }
+
+ GPDBWritableMapper mapper = new GPDBWritableMapper(gpdbWritable);
+ int[] colTypes = gpdbWritable.getColType();
+ List<OneField> record = new LinkedList<OneField>();
+ for (int i = 0; i < colTypes.length; i++) {
+ mapper.setDataType(colTypes[i]);
+ record.add(new OneField(colTypes[i], mapper.getData(i)));
+ }
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
new file mode 100644
index 0000000..c59fbea
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -0,0 +1,394 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.BufferWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.TEXT;
+
+/**
+ * Class creates the output record that is piped by the java process to the HAWQ
+ * backend. Actually, the output record is serialized and the obtained byte
+ * string is piped to the HAWQ segment. The output record will implement
+ * Writable, and the mission of BridgeOutputBuilder will be to translate a list
+ * of {@link OneField} objects (obtained from the Resolver) into an output
+ * record.
+ */
+public class BridgeOutputBuilder {
+ private ProtocolData inputData;
+ private Writable output = null;
+ private LinkedList<Writable> outputList = null;
+ private Writable partialLine = null;
+ private GPDBWritable errorRecord = null;
+ private int[] schema;
+ private String[] colNames;
+ private boolean samplingEnabled = false;
+ private boolean isPartialLine = false;
+
+ private static final byte DELIM = 10; /* (byte)'\n'; */
+
+ private static final Log LOG = LogFactory.getLog(BridgeOutputBuilder.class);
+
+ /**
+ * Constructs a BridgeOutputBuilder.
+ *
+ * @param input input data, like requested output format and schema
+ * information
+ */
+ public BridgeOutputBuilder(ProtocolData input) {
+ inputData = input;
+ outputList = new LinkedList<Writable>();
+ makeErrorRecord();
+ samplingEnabled = (inputData.getStatsSampleRatio() > 0);
+ }
+
+ /**
+ * We need a separate GPDBWritable record to represent the error record.
+ * Just setting the errorFlag on the "output" GPDBWritable variable is not
+ * good enough, since the GPDBWritable is built only after the first record
+ * is read from the file. And if we encounter an error while fetching the
+ * first record from the file, then the output member will be null. The
+ * reason we cannot count on the schema to build the GPDBWritable output
+ * variable before reading the first record, is because the schema does not
+ * account for arrays - we cannot know from the schema the length of an
+ * array. We find out only after fetching the first record.
+ */
+ void makeErrorRecord() {
+ int[] errSchema = { TEXT.getOID() };
+
+ if (inputData.outputFormat() != OutputFormat.BINARY) {
+ return;
+ }
+
+ errorRecord = new GPDBWritable(errSchema);
+ errorRecord.setError(true);
+ }
+
+ /**
+ * Returns the error record. If the output format is not binary, error
+ * records are not supported, and the given exception will be thrown
+ *
+ * @param ex exception to be stored in record
+ * @return error record
+ * @throws Exception if the output format is not binary
+ */
+ public Writable getErrorOutput(Exception ex) throws Exception {
+ if (inputData.outputFormat() == OutputFormat.BINARY) {
+ errorRecord.setString(0, ex.getMessage());
+ return errorRecord;
+ } else {
+ throw ex;
+ }
+ }
+
+ /**
+ * Translates recFields (obtained from the Resolver) into an output record.
+ *
+ * @param recFields record fields to be serialized
+ * @return list of Writable objects with serialized row
+ * @throws BadRecordException if building the output record failed
+ */
+ public LinkedList<Writable> makeOutput(List<OneField> recFields)
+ throws BadRecordException {
+ if (output == null && inputData.outputFormat() == OutputFormat.BINARY) {
+ makeGPDBWritableOutput();
+ }
+
+ outputList.clear();
+
+ fillOutputRecord(recFields);
+
+ return outputList;
+ }
+
+ /**
+ * Returns whether or not this is a partial line.
+ *
+ * @return true for a partial line
+ */
+ public Writable getPartialLine() {
+ return partialLine;
+ }
+
+ /**
+ * Creates the GPDBWritable object. The object is created one time and is
+ * refilled from recFields for each record sent
+ *
+ * @return empty GPDBWritable object with set columns
+ */
+ GPDBWritable makeGPDBWritableOutput() {
+ int num_actual_fields = inputData.getColumns();
+ schema = new int[num_actual_fields];
+ colNames = new String[num_actual_fields];
+
+ for (int i = 0; i < num_actual_fields; i++) {
+ schema[i] = inputData.getColumn(i).columnTypeCode();
+ colNames[i] = inputData.getColumn(i).columnName();
+ }
+
+ output = new GPDBWritable(schema);
+
+ return (GPDBWritable) output;
+ }
+
+ /**
+ * Fills the output record based on the fields in recFields.
+ *
+ * @param recFields record fields
+ * @throws BadRecordException if building the output record failed
+ */
+ void fillOutputRecord(List<OneField> recFields) throws BadRecordException {
+ if (inputData.outputFormat() == OutputFormat.BINARY) {
+ fillGPDBWritable(recFields);
+ } else {
+ fillText(recFields);
+ }
+ }
+
+ /**
+ * Fills a GPDBWritable object based on recFields. The input record
+ * recFields must correspond to schema. If the record has more or less
+ * fields than the schema we throw an exception. We require that the type of
+ * field[i] in recFields corresponds to the type of field[i] in the schema.
+ *
+ * @param recFields record fields
+ * @throws BadRecordException if building the output record failed
+ */
+ void fillGPDBWritable(List<OneField> recFields) throws BadRecordException {
+ int size = recFields.size();
+ if (size == 0) { // size 0 means the resolver couldn't deserialize any
+ // of the record fields
+ throw new BadRecordException("No fields in record");
+ } else if (size != schema.length) {
+ throw new BadRecordException("Record has " + size
+ + " fields but the schema size is " + schema.length);
+ }
+
+ for (int i = 0; i < size; i++) {
+ OneField current = recFields.get(i);
+ if (!isTypeInSchema(current.type, schema[i])) {
+ throw new BadRecordException("For field " + colNames[i]
+ + " schema requires type "
+ + DataType.get(schema[i]).toString()
+ + " but input record has type "
+ + DataType.get(current.type).toString());
+ }
+
+ fillOneGPDBWritableField(current, i);
+ }
+
+ outputList.add(output);
+ }
+
+ /**
+ * Tests if data type is a string type. String type is a type that can be
+ * serialized as string, such as varchar, bpchar, text, numeric, timestamp,
+ * date.
+ *
+ * @param type data type
+ * @return whether data type is string type
+ */
+ boolean isStringType(DataType type) {
+ return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT,
+ DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains(
+ type);
+ }
+
+ /**
+ * Tests if record field type and schema type correspond.
+ *
+ * @param recType record type code
+ * @param schemaType schema type code
+ * @return whether record type and schema type match
+ */
+ boolean isTypeInSchema(int recType, int schemaType) {
+ DataType dtRec = DataType.get(recType);
+ DataType dtSchema = DataType.get(schemaType);
+
+ return (dtSchema == DataType.UNSUPPORTED_TYPE || dtRec == dtSchema || (isStringType(dtRec) && isStringType(dtSchema)));
+ }
+
+ /**
+ * Fills a Text object based on recFields.
+ *
+ * @param recFields record fields
+ * @throws BadRecordException if text formatted record has more than one
+ * field
+ */
+ void fillText(List<OneField> recFields) throws BadRecordException {
+ /*
+ * For the TEXT case there must be only one record in the list
+ */
+ if (recFields.size() != 1) {
+ throw new BadRecordException(
+ "BridgeOutputBuilder must receive one field when handling the TEXT format");
+ }
+
+ OneField fld = recFields.get(0);
+ int type = fld.type;
+ Object val = fld.val;
+ if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor
+ if (samplingEnabled) {
+ convertTextDataToLines((byte[]) val);
+ } else {
+ output = new BufferWritable((byte[]) val);
+ outputList.add(output); // TODO break output into lines
+ }
+ } else { // from QuotedLineBreakAccessor
+ String textRec = (String) val;
+ output = new Text(textRec + "\n");
+ outputList.add(output);
+ }
+ }
+
+ /**
+ * Breaks raw bytes into lines. Used only for sampling.
+ *
+ * When sampling a data source, we have to make sure that we deal with
+ * actual rows (lines) and not bigger chunks of data such as used by
+ * LineBreakAccessor for performance. The input byte array is broken into
+ * lines, each one stored in the outputList. In case the read data doesn't
+ * end with a line delimiter, which can happen when reading chunks of bytes,
+ * the partial line is stored separately, and is being completed when
+ * reading the next chunk of data.
+ *
+ * @param val input raw data to break into lines
+ */
+ void convertTextDataToLines(byte[] val) {
+ int len = val.length;
+ int start = 0;
+ int end = 0;
+ byte[] line;
+ BufferWritable writable;
+
+ while (start < len) {
+ end = ArrayUtils.indexOf(val, DELIM, start);
+ if (end == ArrayUtils.INDEX_NOT_FOUND) {
+ // data finished in the middle of the line
+ end = len;
+ isPartialLine = true;
+ } else {
+ end++; // include the DELIM character
+ isPartialLine = false;
+ }
+ line = Arrays.copyOfRange(val, start, end);
+
+ if (partialLine != null) {
+ // partial data was completed
+ ((BufferWritable) partialLine).append(line);
+ writable = (BufferWritable) partialLine;
+ partialLine = null;
+ } else {
+ writable = new BufferWritable(line);
+ }
+
+ if (isPartialLine) {
+ partialLine = writable;
+ } else {
+ outputList.add(writable);
+ }
+ start = end;
+ }
+ }
+
+ /**
+ * Fills one GPDBWritable field.
+ *
+ * @param oneField field
+ * @param colIdx column index
+ * @throws BadRecordException if field type is not supported or doesn't
+ * match the schema
+ */
+ void fillOneGPDBWritableField(OneField oneField, int colIdx)
+ throws BadRecordException {
+ int type = oneField.type;
+ Object val = oneField.val;
+ GPDBWritable gpdbOutput = (GPDBWritable) output;
+ try {
+ switch (DataType.get(type)) {
+ case INTEGER:
+ gpdbOutput.setInt(colIdx, (Integer) val);
+ break;
+ case FLOAT8:
+ gpdbOutput.setDouble(colIdx, (Double) val);
+ break;
+ case REAL:
+ gpdbOutput.setFloat(colIdx, (Float) val);
+ break;
+ case BIGINT:
+ gpdbOutput.setLong(colIdx, (Long) val);
+ break;
+ case SMALLINT:
+ gpdbOutput.setShort(colIdx, (Short) val);
+ break;
+ case BOOLEAN:
+ gpdbOutput.setBoolean(colIdx, (Boolean) val);
+ break;
+ case BYTEA:
+ byte[] bts = null;
+ if (val != null) {
+ int length = Array.getLength(val);
+ bts = new byte[length];
+ for (int j = 0; j < length; j++) {
+ bts[j] = Array.getByte(val, j);
+ }
+ }
+ gpdbOutput.setBytes(colIdx, bts);
+ break;
+ case VARCHAR:
+ case BPCHAR:
+ case CHAR:
+ case TEXT:
+ case NUMERIC:
+ case TIMESTAMP:
+ case DATE:
+ gpdbOutput.setString(colIdx,
+ ObjectUtils.toString(val, null));
+ break;
+ default:
+ String valClassName = (val != null) ? val.getClass().getSimpleName()
+ : null;
+ throw new UnsupportedOperationException(valClassName
+ + " is not supported for HAWQ conversion");
+ }
+ } catch (TypeMismatchException e) {
+ throw new BadRecordException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java
new file mode 100644
index 0000000..c516d69
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java
@@ -0,0 +1,37 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+
+/**
+ * Factory class for creation of {@link Fragmenter} objects. The actual {@link Fragmenter} object is "hidden" behind
+ * an {@link Fragmenter} abstract class which is returned by the FragmenterFactory.
+ */
+public class FragmenterFactory {
+ static public Fragmenter create(InputData inputData) throws Exception {
+ String fragmenterName = inputData.getFragmenter();
+
+ return (Fragmenter) Utilities.createAnyInstance(InputData.class, fragmenterName, inputData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java
new file mode 100644
index 0000000..d6efcae
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java
@@ -0,0 +1,89 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hawq.pxf.api.Fragment;
+
+/**
+ * Class for serializing fragments metadata in JSON format. The class implements
+ * {@link StreamingOutput} so the serialization will be done in a stream and not
+ * in one bulk, this in order to avoid running out of memory when processing a
+ * lot of fragments.
+ */
+public class FragmentsResponse implements StreamingOutput {
+
+ private static final Log Log = LogFactory.getLog(FragmentsResponse.class);
+
+ private List<Fragment> fragments;
+
+ /**
+ * Constructs fragments response out of a list of fragments
+ *
+ * @param fragments fragment list
+ */
+ public FragmentsResponse(List<Fragment> fragments) {
+ this.fragments = fragments;
+ }
+
+ /**
+ * Serializes a fragments list in JSON, To be used as the result string for
+ * HAWQ. An example result is as follows:
+ * <code>{"PXFFragments":[{"replicas":
+ * ["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],
+ * "sourceName":"text2.csv", "index":"0","metadata":"<base64 metadata for fragment>",
+ * "userData":"<data_specific_to_third_party_fragmenter>"
+ * },{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"
+ * ],"sourceName":"text_data.csv","index":"0","metadata":
+ * "<base64 metadata for fragment>"
+ * ,"userData":"<data_specific_to_third_party_fragmenter>"
+ * }]}</code>
+ */
+ @Override
+ public void write(OutputStream output) throws IOException,
+ WebApplicationException {
+ DataOutputStream dos = new DataOutputStream(output);
+ ObjectMapper mapper = new ObjectMapper();
+
+ dos.write("{\"PXFFragments\":[".getBytes());
+
+ String prefix = "";
+ for (Fragment fragment : fragments) {
+ StringBuilder result = new StringBuilder();
+ /* metaData and userData are automatically converted to Base64 */
+ result.append(prefix).append(mapper.writeValueAsString(fragment));
+ prefix = ",";
+ dos.write(result.toString().getBytes());
+ }
+
+ dos.write("]}".getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
new file mode 100644
index 0000000..14e87f9
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
@@ -0,0 +1,157 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Utility class for converting Fragments into a {@link FragmentsResponse} that
+ * will serialize them into JSON format.
+ */
+public class FragmentsResponseFormatter {
+
+ private static final Log LOG = LogFactory.getLog(FragmentsResponseFormatter.class);
+
+ /**
+ * Converts Fragments list to FragmentsResponse after replacing host name by
+ * their respective IPs.
+ *
+ * @param fragments list of fragments
+ * @param data data (e.g. path) related to the fragments
+ * @return FragmentsResponse with given fragments
+ * @throws UnknownHostException if converting host names to IP fails
+ */
+ public static FragmentsResponse formatResponse(List<Fragment> fragments,
+ String data)
+ throws UnknownHostException {
+ /* print the raw fragment list to log when in debug level */
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fragments before conversion to IP list:");
+ FragmentsResponseFormatter.printList(fragments, data);
+ }
+
+ /* HD-2550: convert host names to IPs */
+ convertHostsToIPs(fragments);
+
+ updateFragmentIndex(fragments);
+
+ /* print the fragment list to log when in debug level */
+ if (LOG.isDebugEnabled()) {
+ FragmentsResponseFormatter.printList(fragments, data);
+ }
+
+ return new FragmentsResponse(fragments);
+ }
+
+ /**
+ * Updates the fragments' indexes so that it is incremented by sourceName.
+ * (E.g.: {"a", 0}, {"a", 1}, {"b", 0} ... )
+ *
+ * @param fragments fragments to be updated
+ */
+ private static void updateFragmentIndex(List<Fragment> fragments) {
+
+ String sourceName = null;
+ int index = 0;
+ for (Fragment fragment : fragments) {
+
+ String currentSourceName = fragment.getSourceName();
+ if (!currentSourceName.equals(sourceName)) {
+ index = 0;
+ sourceName = currentSourceName;
+ }
+ fragment.setIndex(index++);
+ }
+ }
+
+ /**
+ * Converts hosts to their matching IP addresses.
+ *
+ * @throws UnknownHostException if converting host name to IP fails
+ */
+ private static void convertHostsToIPs(List<Fragment> fragments)
+ throws UnknownHostException {
+ /* host converted to IP map. Used to limit network calls. */
+ HashMap<String, String> hostToIpMap = new HashMap<String, String>();
+
+ for (Fragment fragment : fragments) {
+ String[] hosts = fragment.getReplicas();
+ if (hosts == null) {
+ continue;
+ }
+ String[] ips = new String[hosts.length];
+ int index = 0;
+
+ for (String host : hosts) {
+ String convertedIp = hostToIpMap.get(host);
+ if (convertedIp == null) {
+ /* find host's IP, and add to map */
+ InetAddress addr = InetAddress.getByName(host);
+ convertedIp = addr.getHostAddress();
+ hostToIpMap.put(host, convertedIp);
+ }
+
+ /* update IPs array */
+ ips[index] = convertedIp;
+ ++index;
+ }
+ fragment.setReplicas(ips);
+ }
+ }
+
+ /*
+ * Converts a fragments list to a readable string and prints it to the log.
+ * Intended for debugging purposes only. 'datapath' is the data path part of
+ * the original URI (e.g., table name, *.csv, etc).
+ */
+ private static void printList(List<Fragment> fragments, String datapath) {
+ LOG.debug("List of " + (fragments.isEmpty() ? "no" : fragments.size())
+ + "fragments for \"" + datapath + "\"");
+
+ int i = 0;
+ for (Fragment fragment : fragments) {
+ StringBuilder result = new StringBuilder();
+ result.append("Fragment #").append(++i).append(": [").append(
+ "Source: ").append(fragment.getSourceName()).append(
+ ", Index: ").append(fragment.getIndex()).append(
+ ", Replicas:");
+ for (String host : fragment.getReplicas()) {
+ result.append(" ").append(host);
+ }
+
+ result.append(", Metadata: ").append(
+ new String(fragment.getMetadata()));
+
+ if (fragment.getUserData() != null) {
+ result.append(", User Data: ").append(
+ new String(fragment.getUserData()));
+ }
+ result.append("] ");
+ LOG.debug(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java
new file mode 100644
index 0000000..e1c2eb4
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java
@@ -0,0 +1,135 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
+
+/*
+ * Class for mapping GPDBWritable get functions to java types.
+ */
+public class GPDBWritableMapper {
+
+ private GPDBWritable gpdbWritable;
+ private int type;
+ private DataGetter getter = null;
+
+ public GPDBWritableMapper(GPDBWritable gpdbWritable) {
+ this.gpdbWritable = gpdbWritable;
+ }
+
+ public void setDataType(int type) throws UnsupportedTypeException {
+ this.type = type;
+
+ switch (DataType.get(type)) {
+ case BOOLEAN:
+ getter = new BooleanDataGetter();
+ break;
+ case BYTEA:
+ getter = new BytesDataGetter();
+ break;
+ case BIGINT:
+ getter = new LongDataGetter();
+ break;
+ case SMALLINT:
+ getter = new ShortDataGetter();
+ break;
+ case INTEGER:
+ getter = new IntDataGetter();
+ break;
+ case TEXT:
+ getter = new StringDataGetter();
+ break;
+ case REAL:
+ getter = new FloatDataGetter();
+ break;
+ case FLOAT8:
+ getter = new DoubleDataGetter();
+ break;
+ default:
+ throw new UnsupportedTypeException(
+ "Type " + GPDBWritable.getTypeName(type) +
+ " is not supported by GPDBWritable");
+ }
+ }
+
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return getter.getData(colIdx);
+ }
+
+ private interface DataGetter {
+ abstract Object getData(int colIdx) throws TypeMismatchException;
+ }
+
+ private class BooleanDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getBoolean(colIdx);
+ }
+ }
+
+ private class BytesDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getBytes(colIdx);
+ }
+ }
+
+ private class DoubleDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getDouble(colIdx);
+ }
+ }
+
+ private class FloatDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getFloat(colIdx);
+ }
+ }
+
+ private class IntDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getInt(colIdx);
+ }
+ }
+
+ private class LongDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getLong(colIdx);
+ }
+ }
+
+ private class ShortDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getShort(colIdx);
+ }
+ }
+
+ private class StringDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getString(colIdx);
+ }
+ }
+
+ public String toString() {
+ return "getter type = " + GPDBWritable.getTypeName(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
new file mode 100644
index 0000000..396b711
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
@@ -0,0 +1,36 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+
+/**
+ * Factory class for creation of {@link MetadataFetcher} objects.
+ * The actual {@link MetadataFetcher} object is "hidden" behind an {@link MetadataFetcher}
+ * abstract class which is returned by the MetadataFetcherFactory.
+ */
+public class MetadataFetcherFactory {
+ public static MetadataFetcher create(InputData inputData) throws Exception {
+ return (MetadataFetcher) Utilities.createAnyInstance(InputData.class, inputData.getMetadata(), inputData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java
new file mode 100644
index 0000000..741e201
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import java.util.List;
+
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.Metadata;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+/**
+ * Class for serializing metadata in JSON format. The class implements
+ * {@link StreamingOutput} so the serialization will be done in a stream and not
+ * in one bulk, this in order to avoid running out of memory when processing a
+ * lot of items.
+ */
+public class MetadataResponse implements StreamingOutput {
+
+ private static final Log Log = LogFactory.getLog(MetadataResponse.class);
+ private static final String METADATA_DEFAULT_RESPONSE = "{\"PXFMetadata\":[]}";
+
+ private List<Metadata> metadataList;
+
+ /**
+ * Constructs metadata response out of a metadata list
+ *
+ * @param metadataList metadata list
+ */
+ public MetadataResponse(List<Metadata> metadataList) {
+ this.metadataList = metadataList;
+ }
+
+ /**
+ * Serializes the metadata list in JSON, To be used as the result string for HAWQ.
+ */
+ @Override
+ public void write(OutputStream output) throws IOException {
+ DataOutputStream dos = new DataOutputStream(output);
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(org.codehaus.jackson.map.SerializationConfig.Feature.USE_ANNOTATIONS, true); // enable annotations for serialization
+ mapper.setSerializationInclusion(Inclusion.NON_EMPTY); // ignore empty fields
+
+ if(metadataList == null || metadataList.isEmpty()) {
+ dos.write(METADATA_DEFAULT_RESPONSE.getBytes());
+ return;
+ }
+
+ dos.write("{\"PXFMetadata\":[".getBytes());
+
+ String prefix = "";
+ for (Metadata metadata : metadataList) {
+ if(metadata == null) {
+ throw new IllegalArgumentException("metadata object is null - cannot serialize");
+ }
+ if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+ throw new IllegalArgumentException("metadata for " + metadata.getItem() + " contains no fields - cannot serialize");
+ }
+ StringBuilder result = new StringBuilder();
+ result.append(prefix).append(mapper.writeValueAsString(metadata));
+ prefix = ",";
+ dos.write(result.toString().getBytes());
+ }
+
+ dos.write("]}".getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
new file mode 100644
index 0000000..8225ec5
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
@@ -0,0 +1,95 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import org.apache.hawq.pxf.api.Metadata;
+
+/**
+ * Utility class for converting {@link Metadata} into a JSON format.
+ */
+public class MetadataResponseFormatter {
+
+ private static final Log LOG = LogFactory.getLog(MetadataResponseFormatter.class);
+
+ /**
+ * Converts list of {@link Metadata} to JSON String format.
+ *
+ * @param metadataList list of metadata objects to convert
+ * @param path path string
+ * @return JSON formatted response
+ * @throws IOException if converting the data to JSON fails
+ */
+ public static MetadataResponse formatResponse(List<Metadata> metadataList, String path) throws IOException {
+ /* print the fragment list to log when in debug level */
+ if (LOG.isDebugEnabled()) {
+ MetadataResponseFormatter.printMetadata(metadataList, path);
+ }
+
+ return new MetadataResponse(metadataList);
+ }
+
+ /**
+ * Converts metadata list to a readable string.
+ * Intended for debugging purposes only.
+ */
+ private static void printMetadata(List<Metadata> metadataList, String path) {
+ LOG.debug("Metadata List for path " + path + ": ");
+
+ if (null == metadataList || metadataList.isEmpty()) {
+ LOG.debug("No metadata");
+ return;
+ }
+
+ for(Metadata metadata: metadataList) {
+ StringBuilder result = new StringBuilder();
+
+ if (metadata == null) {
+ result.append("None");
+ LOG.debug(result);
+ continue;
+ }
+
+ result.append("Metadata for item \"").append(metadata.getItem()).append("\": ");
+
+ if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+ result.append("None");
+ } else {
+ int i = 0;
+ for (Metadata.Field field : metadata.getFields()) {
+ result.append("Field #").append(++i).append(": [")
+ .append("Name: ").append(field.getName())
+ .append(", Type: ").append(field.getType().getTypeName())
+ .append(", Source type: ").append(field.getSourceType()).append("] ");
+ }
+ }
+ LOG.debug(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
new file mode 100644
index 0000000..01a95ab
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
@@ -0,0 +1,179 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.nio.charset.CharacterCodingException;
+import java.util.LinkedList;
+import java.util.zip.ZipException;
+
+/**
+ * ReadBridge class creates appropriate accessor and resolver. It will then
+ * create the correct output conversion class (e.g. Text or GPDBWritable) and
+ * get records from accessor, let resolver deserialize them and reserialize them
+ * using the output conversion class. <br>
+ * The class handles BadRecordException and other exception type and marks the
+ * record as invalid for HAWQ.
+ */
+public class ReadBridge implements Bridge {
+ ReadAccessor fileAccessor = null;
+ ReadResolver fieldsResolver = null;
+ BridgeOutputBuilder outputBuilder = null;
+ LinkedList<Writable> outputQueue = null;
+
+ private static final Log LOG = LogFactory.getLog(ReadBridge.class);
+
+ /**
+ * C'tor - set the implementation of the bridge.
+ *
+ * @param protData input containing accessor and resolver names
+ * @throws Exception if accessor or resolver can't be instantiated
+ */
+ public ReadBridge(ProtocolData protData) throws Exception {
+ outputBuilder = new BridgeOutputBuilder(protData);
+ outputQueue = new LinkedList<Writable>();
+ fileAccessor = getFileAccessor(protData);
+ fieldsResolver = getFieldsResolver(protData);
+ }
+
+ /**
+ * Accesses the underlying HDFS file.
+ */
+ @Override
+ public boolean beginIteration() throws Exception {
+ return fileAccessor.openForRead();
+ }
+
+ /**
+ * Fetches next object from file and turn it into a record that the HAWQ
+ * backend can process.
+ */
+ @Override
+ public Writable getNext() throws Exception {
+ Writable output = null;
+ OneRow onerow = null;
+
+ if (!outputQueue.isEmpty()) {
+ return outputQueue.pop();
+ }
+
+ try {
+ while (outputQueue.isEmpty()) {
+ onerow = fileAccessor.readNextObject();
+ if (onerow == null) {
+ fileAccessor.closeForRead();
+ output = outputBuilder.getPartialLine();
+ if (output != null) {
+ LOG.warn("A partial record in the end of the fragment");
+ }
+ // if there is a partial line, return it now, otherwise it
+ // will return null
+ return output;
+ }
+
+ // we checked before that outputQueue is empty, so we can
+ // override it.
+ outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+ if (!outputQueue.isEmpty()) {
+ output = outputQueue.pop();
+ break;
+ }
+ }
+ } catch (IOException ex) {
+ if (!isDataException(ex)) {
+ fileAccessor.closeForRead();
+ throw ex;
+ }
+ output = outputBuilder.getErrorOutput(ex);
+ } catch (BadRecordException ex) {
+ String row_info = "null";
+ if (onerow != null) {
+ row_info = onerow.toString();
+ }
+ if (ex.getCause() != null) {
+ LOG.debug("BadRecordException " + ex.getCause().toString()
+ + ": " + row_info);
+ } else {
+ LOG.debug(ex.toString() + ": " + row_info);
+ }
+ output = outputBuilder.getErrorOutput(ex);
+ } catch (Exception ex) {
+ fileAccessor.closeForRead();
+ throw ex;
+ }
+
+ return output;
+ }
+
+ public static ReadAccessor getFileAccessor(InputData inputData)
+ throws Exception {
+ return (ReadAccessor) Utilities.createAnyInstance(InputData.class,
+ inputData.getAccessor(), inputData);
+ }
+
+ public static ReadResolver getFieldsResolver(InputData inputData)
+ throws Exception {
+ return (ReadResolver) Utilities.createAnyInstance(InputData.class,
+ inputData.getResolver(), inputData);
+ }
+
+ /*
+ * There are many exceptions that inherit IOException. Some of them like
+ * EOFException are generated due to a data problem, and not because of an
+ * IO/connection problem as the father IOException might lead us to believe.
+ * For example, an EOFException will be thrown while fetching a record from
+ * a sequence file, if there is a formatting problem in the record. Fetching
+ * record from the sequence-file is the responsibility of the accessor so
+ * the exception will be thrown from the accessor. We identify this cases by
+ * analyzing the exception type, and when we discover that the actual
+ * problem was a data problem, we return the errorOutput GPDBWritable.
+ */
+ private boolean isDataException(IOException ex) {
+ return (ex instanceof EOFException
+ || ex instanceof CharacterCodingException
+ || ex instanceof CharConversionException
+ || ex instanceof UTFDataFormatException || ex instanceof ZipException);
+ }
+
+ @Override
+ public boolean setNext(DataInputStream inputStream) {
+ throw new UnsupportedOperationException("setNext is not implemented");
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ boolean result = ((Plugin) fileAccessor).isThreadSafe()
+ && ((Plugin) fieldsResolver).isThreadSafe();
+ LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
new file mode 100644
index 0000000..d5ae66a
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
@@ -0,0 +1,131 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInputStream;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+/**
+ * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output
+ * records, based on a ratio sample. The sample to pass or discard a record is
+ * done after all of the processing is completed (
+ * {@code accessor -> resolver -> output builder}) to make sure there are no
+ * chunks of data instead of single records. <br>
+ * The goal is to get as uniform as possible sampling. This is achieved by
+ * creating a bit map matching the precision of the sampleRatio, so that for a
+ * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be
+ * set. This map is matched against each read record, discarding ones with a 0
+ * bit and continuing until a 1 bit record is read.
+ */
+public class ReadSamplingBridge implements Bridge {
+
+ ReadBridge bridge;
+
+ float sampleRatio;
+ BitSet sampleBitSet;
+ int bitSetSize;
+ int sampleSize;
+ int curIndex;
+
+ private static final Log LOG = LogFactory.getLog(ReadSamplingBridge.class);
+
+ /**
+ * C'tor - set the implementation of the bridge.
+ *
+ * @param protData input containing sampling ratio
+ * @throws Exception if the sampling ratio is wrong
+ */
+ public ReadSamplingBridge(ProtocolData protData) throws Exception {
+ bridge = new ReadBridge(protData);
+
+ this.sampleRatio = protData.getStatsSampleRatio();
+ if (sampleRatio < 0.0001 || sampleRatio > 1.0) {
+ throw new IllegalArgumentException(
+ "sampling ratio must be a value between 0.0001 and 1.0. "
+ + "(value = " + sampleRatio + ")");
+ }
+
+ calculateBitSetSize();
+
+ this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize,
+ sampleSize);
+ this.curIndex = 0;
+ }
+
+ private void calculateBitSetSize() {
+
+ sampleSize = (int) (sampleRatio * 10000);
+ bitSetSize = 10000;
+
+ while ((bitSetSize > 100) && (sampleSize % 10 == 0)) {
+ bitSetSize /= 10;
+ sampleSize /= 10;
+ }
+ LOG.debug("bit set size = " + bitSetSize + " sample size = "
+ + sampleSize);
+ }
+
+ /**
+ * Fetches next sample, according to the sampling ratio.
+ */
+ @Override
+ public Writable getNext() throws Exception {
+ Writable output = bridge.getNext();
+
+ // sample - if bit is false, advance to the next object
+ while (!sampleBitSet.get(curIndex)) {
+
+ if (output == null) {
+ break;
+ }
+ incIndex();
+ output = bridge.getNext();
+ }
+
+ incIndex();
+ return output;
+ }
+
+ private void incIndex() {
+ curIndex = (++curIndex) % bitSetSize;
+ }
+
+ @Override
+ public boolean beginIteration() throws Exception {
+ return bridge.beginIteration();
+ }
+
+ @Override
+ public boolean setNext(DataInputStream inputStream) throws Exception {
+ return bridge.setNext(inputStream);
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ return bridge.isThreadSafe();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
new file mode 100644
index 0000000..c3ee731
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
@@ -0,0 +1,117 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.util.List;
+
+/*
+ * WriteBridge class creates appropriate accessor and resolver.
+ * It reads data from inputStream by the resolver,
+ * and writes it to the Hadoop storage with the accessor.
+ */
+public class WriteBridge implements Bridge {
+ private static final Log LOG = LogFactory.getLog(WriteBridge.class);
+ WriteAccessor fileAccessor = null;
+ WriteResolver fieldsResolver = null;
+ BridgeInputBuilder inputBuilder;
+
+ /*
+ * C'tor - set the implementation of the bridge
+ */
+ public WriteBridge(ProtocolData protocolData) throws Exception {
+
+ inputBuilder = new BridgeInputBuilder(protocolData);
+ /* plugins accept InputData parameters */
+ fileAccessor = getFileAccessor(protocolData);
+ fieldsResolver = getFieldsResolver(protocolData);
+
+ }
+
+ /*
+ * Accesses the underlying HDFS file
+ */
+ @Override
+ public boolean beginIteration() throws Exception {
+ return fileAccessor.openForWrite();
+ }
+
+ /*
+ * Read data from stream, convert it using WriteResolver into OneRow object, and
+ * pass to WriteAccessor to write into file.
+ */
+ @Override
+ public boolean setNext(DataInputStream inputStream) throws Exception {
+
+ List<OneField> record = inputBuilder.makeInput(inputStream);
+ if (record == null) {
+ close();
+ return false;
+ }
+
+ OneRow onerow = fieldsResolver.setFields(record);
+ if (onerow == null) {
+ close();
+ return false;
+ }
+ if (!fileAccessor.writeNextObject(onerow)) {
+ close();
+ throw new BadRecordException();
+ }
+ return true;
+ }
+
+ private void close() throws Exception {
+ try {
+ fileAccessor.closeForWrite();
+ } catch (Exception e) {
+ LOG.error("Failed to close bridge resources: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ private static WriteAccessor getFileAccessor(InputData inputData) throws Exception {
+ return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+ }
+
+ private static WriteResolver getFieldsResolver(InputData inputData) throws Exception {
+ return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+ }
+
+ @Override
+ public Writable getNext() {
+ throw new UnsupportedOperationException("getNext is not implemented");
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+ }
+}