You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/10 23:29:03 UTC
svn commit: r1383152 [9/27] - in /incubator/hcatalog/trunk: ./
hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/
hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/
hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ s...
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java Mon Sep 10 23:28:55 2012
@@ -46,20 +46,22 @@ public class HCatFieldSchema implements
STRUCT;
public static Category fromType(Type type) {
- if (Type.ARRAY == type){
+ if (Type.ARRAY == type) {
return ARRAY;
- }else if(Type.STRUCT == type){
+ } else if (Type.STRUCT == type) {
return STRUCT;
- }else if (Type.MAP == type){
+ } else if (Type.MAP == type) {
return MAP;
- }else{
+ } else {
return PRIMITIVE;
}
}
- };
+ }
+
+ ;
- public boolean isComplex(){
- return (category == Category.PRIMITIVE) ? false : true;
+ public boolean isComplex() {
+ return (category == Category.PRIMITIVE) ? false : true;
}
/**
@@ -84,7 +86,7 @@ public class HCatFieldSchema implements
private String typeString = null;
@SuppressWarnings("unused")
- private HCatFieldSchema(){
+ private HCatFieldSchema() {
// preventing empty ctor from being callable
}
@@ -92,7 +94,7 @@ public class HCatFieldSchema implements
* Returns type of the field
* @return type of the field
*/
- public Type getType(){
+ public Type getType() {
return type;
}
@@ -100,7 +102,7 @@ public class HCatFieldSchema implements
* Returns category of the field
* @return category of the field
*/
- public Category getCategory(){
+ public Category getCategory() {
return category;
}
@@ -108,11 +110,11 @@ public class HCatFieldSchema implements
* Returns name of the field
* @return name of the field
*/
- public String getName(){
+ public String getName() {
return fieldName;
}
- public String getComment(){
+ public String getComment() {
return comment;
}
@@ -123,7 +125,7 @@ public class HCatFieldSchema implements
* @throws HCatException if call made on non-primitive types
*/
public HCatFieldSchema(String fieldName, Type type, String comment) throws HCatException {
- assertTypeInCategory(type,Category.PRIMITIVE,fieldName);
+ assertTypeInCategory(type, Category.PRIMITIVE, fieldName);
this.fieldName = fieldName;
this.type = type;
this.category = Category.PRIMITIVE;
@@ -137,21 +139,21 @@ public class HCatFieldSchema implements
* @param subSchema - subschema of the struct, or element schema of the elements in the array
* @throws HCatException if call made on Primitive or Map types
*/
- public HCatFieldSchema(String fieldName, Type type, HCatSchema subSchema,String comment) throws HCatException{
- assertTypeNotInCategory(type,Category.PRIMITIVE);
- assertTypeNotInCategory(type,Category.MAP);
+ public HCatFieldSchema(String fieldName, Type type, HCatSchema subSchema, String comment) throws HCatException {
+ assertTypeNotInCategory(type, Category.PRIMITIVE);
+ assertTypeNotInCategory(type, Category.MAP);
this.fieldName = fieldName;
this.type = type;
this.category = Category.fromType(type);
this.subSchema = subSchema;
- if(type == Type.ARRAY){
- this.subSchema.get(0).setName(null);
+ if (type == Type.ARRAY) {
+ this.subSchema.get(0).setName(null);
}
this.comment = comment;
}
private void setName(String name) {
- this.fieldName = name;
+ this.fieldName = name;
}
/**
@@ -162,9 +164,9 @@ public class HCatFieldSchema implements
* @param mapValueSchema - subschema of the value of the Map
* @throws HCatException if call made on non-Map types
*/
- public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema mapValueSchema, String comment) throws HCatException{
- assertTypeInCategory(type,Category.MAP, fieldName);
- assertTypeInCategory(mapKeyType,Category.PRIMITIVE, fieldName);
+ public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema mapValueSchema, String comment) throws HCatException {
+ assertTypeInCategory(type, Category.MAP, fieldName);
+ assertTypeInCategory(mapKeyType, Category.PRIMITIVE, fieldName);
this.fieldName = fieldName;
this.type = Type.MAP;
this.category = Category.MAP;
@@ -175,66 +177,66 @@ public class HCatFieldSchema implements
}
public HCatSchema getStructSubSchema() throws HCatException {
- assertTypeInCategory(this.type,Category.STRUCT, this.fieldName);
+ assertTypeInCategory(this.type, Category.STRUCT, this.fieldName);
return subSchema;
}
public HCatSchema getArrayElementSchema() throws HCatException {
- assertTypeInCategory(this.type,Category.ARRAY, this.fieldName);
+ assertTypeInCategory(this.type, Category.ARRAY, this.fieldName);
return subSchema;
}
public Type getMapKeyType() throws HCatException {
- assertTypeInCategory(this.type,Category.MAP, this.fieldName);
+ assertTypeInCategory(this.type, Category.MAP, this.fieldName);
return mapKeyType;
}
public HCatSchema getMapValueSchema() throws HCatException {
- assertTypeInCategory(this.type,Category.MAP, this.fieldName);
+ assertTypeInCategory(this.type, Category.MAP, this.fieldName);
return subSchema;
}
private static void assertTypeInCategory(Type type, Category category, String fieldName) throws HCatException {
Category typeCategory = Category.fromType(type);
- if (typeCategory != category){
- throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory+ " (field "+fieldName+")");
+ if (typeCategory != category) {
+ throw new HCatException("Type category mismatch. Expected " + category + " but type " + type + " in category " + typeCategory + " (field " + fieldName + ")");
}
}
private static void assertTypeNotInCategory(Type type, Category category) throws HCatException {
Category typeCategory = Category.fromType(type);
- if (typeCategory == category){
- throw new HCatException("Type category mismatch. Expected type "+type+" not in category "+category+" but was so.");
+ if (typeCategory == category) {
+ throw new HCatException("Type category mismatch. Expected type " + type + " not in category " + category + " but was so.");
}
}
@Override
public String toString() {
- return new ToStringBuilder(this)
- .append("fieldName", fieldName)
- .append("comment", comment)
- .append("type", getTypeString())
- .append("category", category)
- .toString();
+ return new ToStringBuilder(this)
+ .append("fieldName", fieldName)
+ .append("comment", comment)
+ .append("type", getTypeString())
+ .append("category", category)
+ .toString();
}
- public String getTypeString(){
- if (typeString != null){
+ public String getTypeString() {
+ if (typeString != null) {
return typeString;
}
StringBuilder sb = new StringBuilder();
- if (Category.PRIMITIVE == category){
+ if (Category.PRIMITIVE == category) {
sb.append(type);
- }else if (Category.STRUCT == category){
+ } else if (Category.STRUCT == category) {
sb.append("struct<");
sb.append(subSchema.getSchemaAsTypeString());
sb.append(">");
- }else if (Category.ARRAY == category){
+ } else if (Category.ARRAY == category) {
sb.append("array<");
sb.append(subSchema.getSchemaAsTypeString());
sb.append(">");
- }else if (Category.MAP == category){
+ } else if (Category.MAP == category) {
sb.append("map<");
sb.append(mapKeyType);
sb.append(",");
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchema.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchema.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchema.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchema.java Mon Sep 10 23:28:55 2012
@@ -30,12 +30,12 @@ import org.apache.hcatalog.common.HCatEx
* HCatSchema. This class is NOT thread-safe.
*/
-public class HCatSchema implements Serializable{
+public class HCatSchema implements Serializable {
private static final long serialVersionUID = 1L;
private final List<HCatFieldSchema> fieldSchemas;
- private final Map<String,Integer> fieldPositionMap;
+ private final Map<String, Integer> fieldPositionMap;
private final List<String> fieldNames;
/**
@@ -44,44 +44,44 @@ public class HCatSchema implements Seria
* on fieldSchemas won't get reflected in HCatSchema. Each fieldSchema's name
* in the list must be unique, otherwise throws IllegalArgumentException.
*/
- public HCatSchema(final List<HCatFieldSchema> fieldSchemas){
+ public HCatSchema(final List<HCatFieldSchema> fieldSchemas) {
this.fieldSchemas = new ArrayList<HCatFieldSchema>(fieldSchemas);
int idx = 0;
- fieldPositionMap = new HashMap<String,Integer>();
+ fieldPositionMap = new HashMap<String, Integer>();
fieldNames = new ArrayList<String>();
- for (HCatFieldSchema field : fieldSchemas){
- if(field == null)
+ for (HCatFieldSchema field : fieldSchemas) {
+ if (field == null)
throw new IllegalArgumentException("Field cannot be null");
String fieldName = field.getName();
- if(fieldPositionMap.containsKey(fieldName))
+ if (fieldPositionMap.containsKey(fieldName))
throw new IllegalArgumentException("Field named " + fieldName +
- " already exists");
+ " already exists");
fieldPositionMap.put(fieldName, idx);
fieldNames.add(fieldName);
idx++;
}
}
- public void append(final HCatFieldSchema hfs) throws HCatException{
- if(hfs == null)
- throw new HCatException("Attempt to append null HCatFieldSchema in HCatSchema.");
-
- String fieldName = hfs.getName();
- if(fieldPositionMap.containsKey(fieldName))
- throw new HCatException("Attempt to append HCatFieldSchema with already " +
- "existing name: " + fieldName + ".");
-
- this.fieldSchemas.add(hfs);
- this.fieldNames.add(fieldName);
- this.fieldPositionMap.put(fieldName, this.size()-1);
+ public void append(final HCatFieldSchema hfs) throws HCatException {
+ if (hfs == null)
+ throw new HCatException("Attempt to append null HCatFieldSchema in HCatSchema.");
+
+ String fieldName = hfs.getName();
+ if (fieldPositionMap.containsKey(fieldName))
+ throw new HCatException("Attempt to append HCatFieldSchema with already " +
+ "existing name: " + fieldName + ".");
+
+ this.fieldSchemas.add(hfs);
+ this.fieldNames.add(fieldName);
+ this.fieldPositionMap.put(fieldName, this.size() - 1);
}
/**
* Users are not allowed to modify the list directly, since HCatSchema
* maintains internal state. Use append/remove to modify the schema.
*/
- public List<HCatFieldSchema> getFields(){
+ public List<HCatFieldSchema> getFields() {
return Collections.unmodifiableList(this.fieldSchemas);
}
@@ -91,14 +91,14 @@ public class HCatSchema implements Seria
* present, returns null.
*/
public Integer getPosition(String fieldName) {
- return fieldPositionMap.get(fieldName);
+ return fieldPositionMap.get(fieldName);
}
public HCatFieldSchema get(String fieldName) throws HCatException {
return get(getPosition(fieldName));
}
- public List<String> getFieldNames(){
+ public List<String> getFieldNames() {
return this.fieldNames;
}
@@ -106,32 +106,32 @@ public class HCatSchema implements Seria
return fieldSchemas.get(position);
}
- public int size(){
- return fieldSchemas.size();
+ public int size() {
+ return fieldSchemas.size();
}
public void remove(final HCatFieldSchema hcatFieldSchema) throws HCatException {
- if(!fieldSchemas.contains(hcatFieldSchema)){
- throw new HCatException("Attempt to delete a non-existent column from HCat Schema: "+ hcatFieldSchema);
- }
-
- fieldSchemas.remove(hcatFieldSchema);
- fieldPositionMap.remove(hcatFieldSchema);
- fieldNames.remove(hcatFieldSchema.getName());
+ if (!fieldSchemas.contains(hcatFieldSchema)) {
+ throw new HCatException("Attempt to delete a non-existent column from HCat Schema: " + hcatFieldSchema);
+ }
+
+ fieldSchemas.remove(hcatFieldSchema);
+ fieldPositionMap.remove(hcatFieldSchema);
+ fieldNames.remove(hcatFieldSchema.getName());
}
@Override
public String toString() {
boolean first = true;
StringBuilder sb = new StringBuilder();
- for (HCatFieldSchema hfs : fieldSchemas){
- if (!first){
+ for (HCatFieldSchema hfs : fieldSchemas) {
+ if (!first) {
sb.append(",");
- }else{
+ } else {
first = false;
}
- if (hfs.getName() != null){
+ if (hfs.getName() != null) {
sb.append(hfs.getName());
sb.append(":");
}
@@ -140,16 +140,16 @@ public class HCatSchema implements Seria
return sb.toString();
}
- public String getSchemaAsTypeString(){
+ public String getSchemaAsTypeString() {
boolean first = true;
StringBuilder sb = new StringBuilder();
- for (HCatFieldSchema hfs : fieldSchemas){
- if (!first){
+ for (HCatFieldSchema hfs : fieldSchemas) {
+ if (!first) {
sb.append(",");
- }else{
+ } else {
first = false;
}
- if (hfs.getName() != null){
+ if (hfs.getName() != null) {
sb.append(hfs.getName());
sb.append(":");
}
@@ -170,7 +170,7 @@ public class HCatSchema implements Seria
return false;
}
HCatSchema other = (HCatSchema) obj;
- if (!this.getFields().equals(other.getFields())) {
+ if (!this.getFields().equals(other.getFields())) {
return false;
}
return true;
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java Mon Sep 10 23:28:55 2012
@@ -38,15 +38,15 @@ import org.apache.hcatalog.data.schema.H
public class HCatSchemaUtils {
- public static CollectionBuilder getStructSchemaBuilder(){
+ public static CollectionBuilder getStructSchemaBuilder() {
return new CollectionBuilder();
}
- public static CollectionBuilder getListSchemaBuilder(){
+ public static CollectionBuilder getListSchemaBuilder() {
return new CollectionBuilder();
}
- public static MapBuilder getMapSchemaBuilder(){
+ public static MapBuilder getMapSchemaBuilder() {
return new MapBuilder();
}
@@ -58,21 +58,21 @@ public class HCatSchemaUtils {
public static class CollectionBuilder extends HCatSchemaBuilder { // for STRUCTS(multiple-add-calls) and LISTS(single-add-call)
List<HCatFieldSchema> fieldSchemas = null;
- CollectionBuilder(){
+ CollectionBuilder() {
fieldSchemas = new ArrayList<HCatFieldSchema>();
}
- public CollectionBuilder addField(FieldSchema fieldSchema) throws HCatException{
+ public CollectionBuilder addField(FieldSchema fieldSchema) throws HCatException {
return this.addField(getHCatFieldSchema(fieldSchema));
}
- public CollectionBuilder addField(HCatFieldSchema fieldColumnSchema){
+ public CollectionBuilder addField(HCatFieldSchema fieldColumnSchema) {
fieldSchemas.add(fieldColumnSchema);
return this;
}
@Override
- public HCatSchema build() throws HCatException{
+ public HCatSchema build() throws HCatException {
return new HCatSchema(fieldSchemas);
}
@@ -86,7 +86,7 @@ public class HCatSchemaUtils {
@Override
public HCatSchema build() throws HCatException {
List<HCatFieldSchema> fslist = new ArrayList<HCatFieldSchema>();
- fslist.add(new HCatFieldSchema(null,Type.MAP,keyType,valueSchema,null));
+ fslist.add(new HCatFieldSchema(null, Type.MAP, keyType, valueSchema, null));
return new HCatSchema(fslist);
}
@@ -118,26 +118,26 @@ public class HCatSchemaUtils {
private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fieldTypeInfo) throws HCatException {
Category typeCategory = fieldTypeInfo.getCategory();
HCatFieldSchema hCatFieldSchema;
- if (Category.PRIMITIVE == typeCategory){
- hCatFieldSchema = new HCatFieldSchema(fieldName,getPrimitiveHType(fieldTypeInfo),null);
+ if (Category.PRIMITIVE == typeCategory) {
+ hCatFieldSchema = new HCatFieldSchema(fieldName, getPrimitiveHType(fieldTypeInfo), null);
} else if (Category.STRUCT == typeCategory) {
- HCatSchema subSchema = constructHCatSchema((StructTypeInfo)fieldTypeInfo);
- hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.STRUCT,subSchema,null);
+ HCatSchema subSchema = constructHCatSchema((StructTypeInfo) fieldTypeInfo);
+ hCatFieldSchema = new HCatFieldSchema(fieldName, HCatFieldSchema.Type.STRUCT, subSchema, null);
} else if (Category.LIST == typeCategory) {
- HCatSchema subSchema = getHCatSchema(((ListTypeInfo)fieldTypeInfo).getListElementTypeInfo());
- hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.ARRAY,subSchema,null);
+ HCatSchema subSchema = getHCatSchema(((ListTypeInfo) fieldTypeInfo).getListElementTypeInfo());
+ hCatFieldSchema = new HCatFieldSchema(fieldName, HCatFieldSchema.Type.ARRAY, subSchema, null);
} else if (Category.MAP == typeCategory) {
- HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo());
- HCatSchema subSchema = getHCatSchema(((MapTypeInfo)fieldTypeInfo).getMapValueTypeInfo());
- hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.MAP,mapKeyType,subSchema,null);
- } else{
- throw new TypeNotPresentException(fieldTypeInfo.getTypeName(),null);
+ HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo) fieldTypeInfo).getMapKeyTypeInfo());
+ HCatSchema subSchema = getHCatSchema(((MapTypeInfo) fieldTypeInfo).getMapValueTypeInfo());
+ hCatFieldSchema = new HCatFieldSchema(fieldName, HCatFieldSchema.Type.MAP, mapKeyType, subSchema, null);
+ } else {
+ throw new TypeNotPresentException(fieldTypeInfo.getTypeName(), null);
}
return hCatFieldSchema;
}
private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) {
- switch(((PrimitiveTypeInfo)basePrimitiveTypeInfo).getPrimitiveCategory()) {
+ switch (((PrimitiveTypeInfo) basePrimitiveTypeInfo).getPrimitiveCategory()) {
case BOOLEAN:
return HCatContext.getInstance().getConf().getBoolean(
HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER,
@@ -165,17 +165,17 @@ public class HCatSchemaUtils {
case BINARY:
return Type.BINARY;
default:
- throw new TypeNotPresentException(((PrimitiveTypeInfo)basePrimitiveTypeInfo).getTypeName(), null);
+ throw new TypeNotPresentException(((PrimitiveTypeInfo) basePrimitiveTypeInfo).getTypeName(), null);
}
}
- public static HCatSchema getHCatSchema(Schema schema) throws HCatException{
+ public static HCatSchema getHCatSchema(Schema schema) throws HCatException {
return getHCatSchema(schema.getFieldSchemas());
}
- public static HCatSchema getHCatSchema(List<? extends FieldSchema> fslist) throws HCatException{
+ public static HCatSchema getHCatSchema(List<? extends FieldSchema> fslist) throws HCatException {
CollectionBuilder builder = getStructSchemaBuilder();
- for (FieldSchema fieldSchema : fslist){
+ for (FieldSchema fieldSchema : fslist) {
builder.addField(fieldSchema);
}
return builder.build();
@@ -183,8 +183,8 @@ public class HCatSchemaUtils {
private static HCatSchema constructHCatSchema(StructTypeInfo stypeInfo) throws HCatException {
CollectionBuilder builder = getStructSchemaBuilder();
- for (String fieldName : ((StructTypeInfo)stypeInfo).getAllStructFieldNames()){
- builder.addField(getHCatFieldSchema(fieldName,((StructTypeInfo)stypeInfo).getStructFieldTypeInfo(fieldName)));
+ for (String fieldName : ((StructTypeInfo) stypeInfo).getAllStructFieldNames()) {
+ builder.addField(getHCatFieldSchema(fieldName, ((StructTypeInfo) stypeInfo).getStructFieldTypeInfo(fieldName)));
}
return builder.build();
}
@@ -192,22 +192,22 @@ public class HCatSchemaUtils {
public static HCatSchema getHCatSchema(TypeInfo typeInfo) throws HCatException {
Category typeCategory = typeInfo.getCategory();
HCatSchema hCatSchema;
- if (Category.PRIMITIVE == typeCategory){
- hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null,getPrimitiveHType(typeInfo),null)).build();
+ if (Category.PRIMITIVE == typeCategory) {
+ hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null, getPrimitiveHType(typeInfo), null)).build();
} else if (Category.STRUCT == typeCategory) {
HCatSchema subSchema = constructHCatSchema((StructTypeInfo) typeInfo);
- hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null,Type.STRUCT,subSchema,null)).build();
+ hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null, Type.STRUCT, subSchema, null)).build();
} else if (Category.LIST == typeCategory) {
CollectionBuilder builder = getListSchemaBuilder();
- builder.addField(getHCatFieldSchema(null,((ListTypeInfo)typeInfo).getListElementTypeInfo()));
- hCatSchema = new HCatSchema(Arrays.asList(new HCatFieldSchema("",Type.ARRAY, builder.build(), "")));
+ builder.addField(getHCatFieldSchema(null, ((ListTypeInfo) typeInfo).getListElementTypeInfo()));
+ hCatSchema = new HCatSchema(Arrays.asList(new HCatFieldSchema("", Type.ARRAY, builder.build(), "")));
} else if (Category.MAP == typeCategory) {
- HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo)typeInfo).getMapKeyTypeInfo());
- HCatSchema subSchema = getHCatSchema(((MapTypeInfo)typeInfo).getMapValueTypeInfo());
+ HCatFieldSchema.Type mapKeyType = getPrimitiveHType(((MapTypeInfo) typeInfo).getMapKeyTypeInfo());
+ HCatSchema subSchema = getHCatSchema(((MapTypeInfo) typeInfo).getMapValueTypeInfo());
MapBuilder builder = getMapSchemaBuilder();
hCatSchema = builder.withKeyType(mapKeyType).withValueSchema(subSchema).build();
- } else{
- throw new TypeNotPresentException(typeInfo.getTypeName(),null);
+ } else {
+ throw new TypeNotPresentException(typeInfo.getTypeName(), null);
}
return hCatSchema;
}
@@ -217,20 +217,20 @@ public class HCatSchemaUtils {
}
public static HCatSchema getHCatSchema(String schemaString) throws HCatException {
- if ((schemaString == null) || (schemaString.trim().isEmpty())){
+ if ((schemaString == null) || (schemaString.trim().isEmpty())) {
return new HCatSchema(new ArrayList<HCatFieldSchema>()); // empty HSchema construct
}
- HCatSchema outerSchema = getHCatSchemaFromTypeString("struct<"+schemaString+">");
+ HCatSchema outerSchema = getHCatSchemaFromTypeString("struct<" + schemaString + ">");
return outerSchema.get(0).getStructSubSchema();
}
- public static FieldSchema getFieldSchema(HCatFieldSchema hcatFieldSchema){
- return new FieldSchema(hcatFieldSchema.getName(),hcatFieldSchema.getTypeString(),hcatFieldSchema.getComment());
+ public static FieldSchema getFieldSchema(HCatFieldSchema hcatFieldSchema) {
+ return new FieldSchema(hcatFieldSchema.getName(), hcatFieldSchema.getTypeString(), hcatFieldSchema.getComment());
}
- public static List<FieldSchema> getFieldSchemas(List<HCatFieldSchema> hcatFieldSchemas){
+ public static List<FieldSchema> getFieldSchemas(List<HCatFieldSchema> hcatFieldSchemas) {
List<FieldSchema> lfs = new ArrayList<FieldSchema>();
- for (HCatFieldSchema hfs : hcatFieldSchemas){
+ for (HCatFieldSchema hfs : hcatFieldSchemas) {
lfs.add(getFieldSchema(hfs));
}
return lfs;
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java Mon Sep 10 23:28:55 2012
@@ -34,102 +34,102 @@ import org.apache.hcatalog.data.transfer
public class DataTransferFactory {
- /**
- * This should be called once from master node to obtain an instance of
- * {@link HCatReader}.
- *
- * @param re
- * ReadEntity built using {@link ReadEntity.Builder}
- * @param config
- * any configuration which master node wants to pass to HCatalog
- * @return {@link HCatReader}
- */
- public static HCatReader getHCatReader(final ReadEntity re,
- final Map<String, String> config) {
- // In future, this may examine ReadEntity and/or config to return
- // appropriate HCatReader
- return new HCatInputFormatReader(re, config);
- }
-
- /**
- * This should only be called once from every slave node to obtain an instance
- * of {@link HCatReader}.
- *
- * @param split
- * input split obtained at master node
- * @param config
- * configuration obtained at master node
- * @return {@link HCatReader}
- */
- public static HCatReader getHCatReader(final InputSplit split,
- final Configuration config) {
- // In future, this may examine config to return appropriate HCatReader
- return getHCatReader(split, config, DefaultStateProvider.get());
- }
-
- /**
- * This should only be called once from every slave node to obtain an instance
- * of {@link HCatReader}. This should be called if an external system has some
- * state to provide to HCatalog.
- *
- * @param split
- * input split obtained at master node
- * @param config
- * configuration obtained at master node
- * @param sp
- * {@link StateProvider}
- * @return {@link HCatReader}
- */
- public static HCatReader getHCatReader(final InputSplit split,
- final Configuration config, StateProvider sp) {
- // In future, this may examine config to return appropriate HCatReader
- return new HCatInputFormatReader(split, config, sp);
- }
-
- /**
- * This should be called at master node to obtain an instance of
- * {@link HCatWriter}.
- *
- * @param we
- * WriteEntity built using {@link WriteEntity.Builder}
- * @param config
- * any configuration which master wants to pass to HCatalog
- * @return {@link HCatWriter}
- */
- public static HCatWriter getHCatWriter(final WriteEntity we,
- final Map<String, String> config) {
- // In future, this may examine WriteEntity and/or config to return
- // appropriate HCatWriter
- return new HCatOutputFormatWriter(we, config);
- }
-
- /**
- * This should be called at slave nodes to obtain an instance of
- * {@link HCatWriter}.
- *
- * @param cntxt
- * {@link WriterContext} obtained at master node
- * @return {@link HCatWriter}
- */
- public static HCatWriter getHCatWriter(final WriterContext cntxt) {
- // In future, this may examine context to return appropriate HCatWriter
- return getHCatWriter(cntxt, DefaultStateProvider.get());
- }
-
- /**
- * This should be called at slave nodes to obtain an instance of
- * {@link HCatWriter}. If an external system has some mechanism for providing
- * state to HCatalog, this constructor can be used.
- *
- * @param cntxt
- * {@link WriterContext} obtained at master node
- * @param sp
- * {@link StateProvider}
- * @return {@link HCatWriter}
- */
- public static HCatWriter getHCatWriter(final WriterContext cntxt,
- final StateProvider sp) {
- // In future, this may examine context to return appropriate HCatWriter
- return new HCatOutputFormatWriter(cntxt.getConf(), sp);
- }
+ /**
+ * This should be called once from master node to obtain an instance of
+ * {@link HCatReader}.
+ *
+ * @param re
+ * ReadEntity built using {@link ReadEntity.Builder}
+ * @param config
+ * any configuration which master node wants to pass to HCatalog
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final ReadEntity re,
+ final Map<String, String> config) {
+ // In future, this may examine ReadEntity and/or config to return
+ // appropriate HCatReader
+ return new HCatInputFormatReader(re, config);
+ }
+
+ /**
+ * This should only be called once from every slave node to obtain an instance
+ * of {@link HCatReader}.
+ *
+ * @param split
+ * input split obtained at master node
+ * @param config
+ * configuration obtained at master node
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final InputSplit split,
+ final Configuration config) {
+ // In future, this may examine config to return appropriate HCatReader
+ return getHCatReader(split, config, DefaultStateProvider.get());
+ }
+
+ /**
+ * This should only be called once from every slave node to obtain an instance
+ * of {@link HCatReader}. This should be called if an external system has some
+ * state to provide to HCatalog.
+ *
+ * @param split
+ * input split obtained at master node
+ * @param config
+ * configuration obtained at master node
+ * @param sp
+ * {@link StateProvider}
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final InputSplit split,
+ final Configuration config, StateProvider sp) {
+ // In future, this may examine config to return appropriate HCatReader
+ return new HCatInputFormatReader(split, config, sp);
+ }
+
+ /**
+ * This should be called at master node to obtain an instance of
+ * {@link HCatWriter}.
+ *
+ * @param we
+ * WriteEntity built using {@link WriteEntity.Builder}
+ * @param config
+ * any configuration which master wants to pass to HCatalog
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriteEntity we,
+ final Map<String, String> config) {
+ // In future, this may examine WriteEntity and/or config to return
+ // appropriate HCatWriter
+ return new HCatOutputFormatWriter(we, config);
+ }
+
+ /**
+ * This should be called at slave nodes to obtain an instance of
+ * {@link HCatWriter}.
+ *
+ * @param cntxt
+ * {@link WriterContext} obtained at master node
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriterContext cntxt) {
+ // In future, this may examine context to return appropriate HCatWriter
+ return getHCatWriter(cntxt, DefaultStateProvider.get());
+ }
+
+ /**
+ * This should be called at slave nodes to obtain an instance of
+ * {@link HCatWriter}. If an external system has some mechanism for providing
+ * state to HCatalog, this constructor can be used.
+ *
+ * @param cntxt
+ * {@link WriterContext} obtained at master node
+ * @param sp
+ * {@link StateProvider}
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriterContext cntxt,
+ final StateProvider sp) {
+ // In future, this may examine context to return appropriate HCatWriter
+ return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java Mon Sep 10 23:28:55 2012
@@ -29,31 +29,31 @@ import java.util.Map;
abstract class EntityBase {
- String region;
- String tableName;
- String dbName;
- Map<String, String> partitionKVs;
-
- /**
- * Common methods for {@link ReadEntity} and {@link WriteEntity}
- */
-
- abstract static class Entity extends EntityBase {
-
- public String getRegion() {
- return region;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public Map<String, String> getPartitionKVs() {
- return partitionKVs;
+ String region;
+ String tableName;
+ String dbName;
+ Map<String, String> partitionKVs;
+
+ /**
+ * Common methods for {@link ReadEntity} and {@link WriteEntity}
+ */
+
+ abstract static class Entity extends EntityBase {
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public Map<String, String> getPartitionKVs() {
+ return partitionKVs;
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java Mon Sep 10 23:28:55 2012
@@ -34,68 +34,68 @@ import org.apache.hcatalog.data.transfer
public abstract class HCatReader {
- /**
- * This should be called at master node to obtain {@link ReaderContext} which
- * then should be serialized and sent to slave nodes.
- *
- * @return {@link ReaderContext}
- * @throws HCatException
- */
- public abstract ReaderContext prepareRead() throws HCatException;
-
- /**
- * This should be called at slave nodes to read {@link HCatRecord}s
- *
- * @return {@link Iterator} of {@link HCatRecord}
- * @throws HCatException
- */
- public abstract Iterator<HCatRecord> read() throws HCatException;
-
- /**
- * This constructor will be invoked by {@link DataTransferFactory} at master
- * node. Don't use this constructor. Instead, use {@link DataTransferFactory}
- *
- * @param re
- * @param config
- */
- protected HCatReader(final ReadEntity re, final Map<String, String> config) {
- this(config);
- this.re = re;
- }
-
- /**
- * This constructor will be invoked by {@link DataTransferFactory} at slave
- * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory}
- *
- * @param config
- * @param sp
- */
-
- protected HCatReader(final Configuration config, StateProvider sp) {
- this.conf = config;
- this.sp = sp;
- }
-
- protected ReadEntity re; // This will be null at slaves.
- protected Configuration conf;
- protected ReaderContext info;
- protected StateProvider sp; // This will be null at master.
-
- private HCatReader(final Map<String, String> config) {
- Configuration conf = new Configuration();
- if (null != config) {
- for (Entry<String, String> kv : config.entrySet()) {
- conf.set(kv.getKey(), kv.getValue());
- }
+ /**
+ * This should be called at master node to obtain {@link ReaderContext} which
+ * then should be serialized and sent to slave nodes.
+ *
+ * @return {@link ReaderContext}
+ * @throws HCatException
+ */
+ public abstract ReaderContext prepareRead() throws HCatException;
+
+ /**
+ * This should be called at slave nodes to read {@link HCatRecord}s
+ *
+ * @return {@link Iterator} of {@link HCatRecord}
+ * @throws HCatException
+ */
+ public abstract Iterator<HCatRecord> read() throws HCatException;
+
+ /**
+ * This constructor will be invoked by {@link DataTransferFactory} at master
+ * node. Don't use this constructor. Instead, use {@link DataTransferFactory}
+ *
+ * @param re
+ * @param config
+ */
+ protected HCatReader(final ReadEntity re, final Map<String, String> config) {
+ this(config);
+ this.re = re;
}
- this.conf = conf;
- }
- public Configuration getConf() {
- if (null == conf) {
- throw new IllegalStateException(
- "HCatReader is not constructed correctly.");
+ /**
+ * This constructor will be invoked by {@link DataTransferFactory} at slave
+ * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory}
+ *
+ * @param config
+ * @param sp
+ */
+
+ protected HCatReader(final Configuration config, StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ protected ReadEntity re; // This will be null at slaves.
+ protected Configuration conf;
+ protected ReaderContext info;
+ protected StateProvider sp; // This will be null at master.
+
+ private HCatReader(final Map<String, String> config) {
+ Configuration conf = new Configuration();
+ if (null != config) {
+ for (Entry<String, String> kv : config.entrySet()) {
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ if (null == conf) {
+ throw new IllegalStateException(
+ "HCatReader is not constructed correctly.");
+ }
+ return conf;
}
- return conf;
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java Mon Sep 10 23:28:55 2012
@@ -35,79 +35,79 @@ import org.apache.hcatalog.data.transfer
public abstract class HCatWriter {
- protected Configuration conf;
- protected WriteEntity we; // This will be null at slave nodes.
- protected WriterContext info;
- protected StateProvider sp;
-
- /**
- * External system should invoke this method exactly once from a master node.
- *
- * @return {@link WriterContext} This should be serialized and sent to slave
- * nodes to construct HCatWriter there.
- * @throws HCatException
- */
- public abstract WriterContext prepareWrite() throws HCatException;
-
- /**
- * This method should be used at slave needs to perform writes.
- *
- * @param recordItr
- * {@link Iterator} records to be written into HCatalog.
- * @throws {@link HCatException}
- */
- public abstract void write(final Iterator<HCatRecord> recordItr)
- throws HCatException;
-
- /**
- * This method should be called at master node. Primary purpose of this is to
- * do metadata commit.
- *
- * @throws {@link HCatException}
- */
- public abstract void commit(final WriterContext context) throws HCatException;
-
- /**
- * This method should be called at master node. Primary purpose of this is to
- * do cleanups in case of failures.
- *
- * @throws {@link HCatException} *
- */
- public abstract void abort(final WriterContext context) throws HCatException;
-
- /**
- * This constructor will be used at master node
- *
- * @param we
- * WriteEntity defines where in storage records should be written to.
- * @param config
- * Any configuration which external system wants to communicate to
- * HCatalog for performing writes.
- */
- protected HCatWriter(final WriteEntity we, final Map<String, String> config) {
- this(config);
- this.we = we;
- }
-
- /**
- * This constructor will be used at slave nodes.
- *
- * @param config
- */
- protected HCatWriter(final Configuration config, final StateProvider sp) {
- this.conf = config;
- this.sp = sp;
- }
-
- private HCatWriter(final Map<String, String> config) {
- Configuration conf = new Configuration();
- if (config != null) {
- // user is providing config, so it could be null.
- for (Entry<String, String> kv : config.entrySet()) {
- conf.set(kv.getKey(), kv.getValue());
- }
+ protected Configuration conf;
+ protected WriteEntity we; // This will be null at slave nodes.
+ protected WriterContext info;
+ protected StateProvider sp;
+
+ /**
+ * External system should invoke this method exactly once from a master node.
+ *
+ * @return {@link WriterContext} This should be serialized and sent to slave
+ * nodes to construct HCatWriter there.
+ * @throws HCatException
+ */
+ public abstract WriterContext prepareWrite() throws HCatException;
+
+ /**
+ * This method should be used at slave needs to perform writes.
+ *
+ * @param recordItr
+ * {@link Iterator} records to be written into HCatalog.
+ * @throws {@link HCatException}
+ */
+ public abstract void write(final Iterator<HCatRecord> recordItr)
+ throws HCatException;
+
+ /**
+ * This method should be called at master node. Primary purpose of this is to
+ * do metadata commit.
+ *
+ * @throws {@link HCatException}
+ */
+ public abstract void commit(final WriterContext context) throws HCatException;
+
+ /**
+ * This method should be called at master node. Primary purpose of this is to
+ * do cleanups in case of failures.
+ *
+ * @throws {@link HCatException} *
+ */
+ public abstract void abort(final WriterContext context) throws HCatException;
+
+ /**
+ * This constructor will be used at master node
+ *
+ * @param we
+ * WriteEntity defines where in storage records should be written to.
+ * @param config
+ * Any configuration which external system wants to communicate to
+ * HCatalog for performing writes.
+ */
+ protected HCatWriter(final WriteEntity we, final Map<String, String> config) {
+ this(config);
+ this.we = we;
}
- this.conf = conf;
- }
+ /**
+ * This constructor will be used at slave nodes.
+ *
+ * @param config
+ */
+ protected HCatWriter(final Configuration config, final StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ private HCatWriter(final Map<String, String> config) {
+ Configuration conf = new Configuration();
+ if (config != null) {
+ // user is providing config, so it could be null.
+ for (Entry<String, String> kv : config.entrySet()) {
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ this.conf = conf;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java Mon Sep 10 23:28:55 2012
@@ -22,67 +22,67 @@ import java.util.Map;
public class ReadEntity extends EntityBase.Entity {
- private String filterString;
-
- /**
- * Don't instantiate {@link ReadEntity} directly. Use,
- * {@link ReadEntity.Builder} instead.
- *
- */
- private ReadEntity() {
- // Not allowed
- }
-
- private ReadEntity(Builder builder) {
-
- this.region = builder.region;
- this.dbName = builder.dbName;
- this.tableName = builder.tableName;
- this.partitionKVs = builder.partitionKVs;
- this.filterString = builder.filterString;
- }
-
- public String getFilterString() {
- return this.filterString;
- }
-
- /**
- * This class should be used to build {@link ReadEntity}. It follows builder
- * pattern, letting you build your {@link ReadEntity} with whatever level of
- * detail you want.
- *
- */
- public static class Builder extends EntityBase {
-
private String filterString;
- public Builder withRegion(final String region) {
- this.region = region;
- return this;
- }
-
- public Builder withDatabase(final String dbName) {
- this.dbName = dbName;
- return this;
- }
-
- public Builder withTable(final String tblName) {
- this.tableName = tblName;
- return this;
- }
-
- public Builder withPartition(final Map<String, String> partKVs) {
- this.partitionKVs = partKVs;
- return this;
- }
-
- public Builder withFilter(String filterString) {
- this.filterString = filterString;
- return this;
- }
-
- public ReadEntity build() {
- return new ReadEntity(this);
+ /**
+ * Don't instantiate {@link ReadEntity} directly. Use,
+ * {@link ReadEntity.Builder} instead.
+ *
+ */
+ private ReadEntity() {
+ // Not allowed
+ }
+
+ private ReadEntity(Builder builder) {
+
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ this.filterString = builder.filterString;
+ }
+
+ public String getFilterString() {
+ return this.filterString;
+ }
+
+ /**
+ * This class should be used to build {@link ReadEntity}. It follows builder
+ * pattern, letting you build your {@link ReadEntity} with whatever level of
+ * detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ private String filterString;
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String, String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public Builder withFilter(String filterString) {
+ this.filterString = filterString;
+ return this;
+ }
+
+ public ReadEntity build() {
+ return new ReadEntity(this);
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java Mon Sep 10 23:28:55 2012
@@ -38,51 +38,51 @@ import org.apache.hcatalog.mapreduce.HCa
*/
public class ReaderContext implements Externalizable, Configurable {
- private static final long serialVersionUID = -2656468331739574367L;
- private List<InputSplit> splits;
- private Configuration conf;
-
- public ReaderContext() {
- this.splits = new ArrayList<InputSplit>();
- this.conf = new Configuration();
- }
-
- public void setInputSplits(final List<InputSplit> splits) {
- this.splits = splits;
- }
-
- public List<InputSplit> getSplits() {
- return splits;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- out.writeInt(splits.size());
- for (InputSplit split : splits) {
- ((HCatSplit) split).write(out);
- }
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- int numOfSplits = in.readInt();
- for (int i = 0; i < numOfSplits; i++) {
- HCatSplit split = new HCatSplit();
- split.readFields(in);
- splits.add(split);
+ private static final long serialVersionUID = -2656468331739574367L;
+ private List<InputSplit> splits;
+ private Configuration conf;
+
+ public ReaderContext() {
+ this.splits = new ArrayList<InputSplit>();
+ this.conf = new Configuration();
+ }
+
+ public void setInputSplits(final List<InputSplit> splits) {
+ this.splits = splits;
+ }
+
+ public List<InputSplit> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ out.writeInt(splits.size());
+ for (InputSplit split : splits) {
+ ((HCatSplit) split).write(out);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ int numOfSplits = in.readInt();
+ for (int i = 0; i < numOfSplits; i++) {
+ HCatSplit split = new HCatSplit();
+ split.readFields(in);
+ splits.add(split);
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java Mon Sep 10 23:28:55 2012
@@ -22,53 +22,53 @@ import java.util.Map;
public class WriteEntity extends EntityBase.Entity {
- /**
- * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to
- * build {@link WriteEntity}.
- */
-
- private WriteEntity() {
- // Not allowed.
- }
-
- private WriteEntity(Builder builder) {
- this.region = builder.region;
- this.dbName = builder.dbName;
- this.tableName = builder.tableName;
- this.partitionKVs = builder.partitionKVs;
- }
-
- /**
- * This class should be used to build {@link WriteEntity}. It follows builder
- * pattern, letting you build your {@link WriteEntity} with whatever level of
- * detail you want.
- *
- */
- public static class Builder extends EntityBase {
-
- public Builder withRegion(final String region) {
- this.region = region;
- return this;
- }
+ /**
+ * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to
+ * build {@link WriteEntity}.
+ */
- public Builder withDatabase(final String dbName) {
- this.dbName = dbName;
- return this;
+ private WriteEntity() {
+ // Not allowed.
}
- public Builder withTable(final String tblName) {
- this.tableName = tblName;
- return this;
+ private WriteEntity(Builder builder) {
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
}
- public Builder withPartition(final Map<String, String> partKVs) {
- this.partitionKVs = partKVs;
- return this;
- }
+ /**
+ * This class should be used to build {@link WriteEntity}. It follows builder
+ * pattern, letting you build your {@link WriteEntity} with whatever level of
+ * detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String, String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public WriteEntity build() {
+ return new WriteEntity(this);
+ }
- public WriteEntity build() {
- return new WriteEntity(this);
}
-
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java Mon Sep 10 23:28:55 2012
@@ -34,31 +34,31 @@ import org.apache.hadoop.conf.Configurat
*/
public class WriterContext implements Externalizable, Configurable {
- private static final long serialVersionUID = -5899374262971611840L;
- private Configuration conf;
+ private static final long serialVersionUID = -5899374262971611840L;
+ private Configuration conf;
- public WriterContext() {
- conf = new Configuration();
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- this.conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- }
+ public WriterContext() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Mon Sep 10 23:28:55 2012
@@ -42,99 +42,99 @@ import org.apache.hcatalog.shims.HCatHad
/**
* This reader reads via {@link HCatInputFormat}
- *
+ *
*/
public class HCatInputFormatReader extends HCatReader {
- private InputSplit split;
+ private InputSplit split;
- public HCatInputFormatReader(InputSplit split, Configuration config,
- StateProvider sp) {
- super(config, sp);
- this.split = split;
- }
-
- public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
- super(info, config);
- }
-
- @Override
- public ReaderContext prepareRead() throws HCatException {
-
- try {
- Job job = new Job(conf);
- InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(),
- re.getTableName(), re.getFilterString());
- HCatInputFormat.setInput(job, jobInfo);
- HCatInputFormat hcif = new HCatInputFormat();
- ReaderContext cntxt = new ReaderContext();
- cntxt.setInputSplits(hcif.getSplits(
- HCatHadoopShims.Instance.get().createJobContext(job.getConfiguration(), null)));
- cntxt.setConf(job.getConfiguration());
- return cntxt;
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ public HCatInputFormatReader(InputSplit split, Configuration config,
+ StateProvider sp) {
+ super(config, sp);
+ this.split = split;
}
- }
- @Override
- public Iterator<HCatRecord> read() throws HCatException {
-
- HCatInputFormat inpFmt = new HCatInputFormat();
- RecordReader<WritableComparable, HCatRecord> rr;
- try {
- TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext(conf, new TaskAttemptID());
- rr = inpFmt.createRecordReader(split, cntxt);
- rr.initialize(split, cntxt);
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+ super(info, config);
}
- return new HCatRecordItr(rr);
- }
-
- private static class HCatRecordItr implements Iterator<HCatRecord> {
- private RecordReader<WritableComparable, HCatRecord> curRecReader;
+ @Override
+ public ReaderContext prepareRead() throws HCatException {
- HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
- curRecReader = rr;
+ try {
+ Job job = new Job(conf);
+ InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(),
+ re.getTableName(), re.getFilterString());
+ HCatInputFormat.setInput(job, jobInfo);
+ HCatInputFormat hcif = new HCatInputFormat();
+ ReaderContext cntxt = new ReaderContext();
+ cntxt.setInputSplits(hcif.getSplits(
+ HCatHadoopShims.Instance.get().createJobContext(job.getConfiguration(), null)));
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
}
@Override
- public boolean hasNext() {
- try {
- boolean retVal = curRecReader.nextKeyValue();
- if (retVal) {
- return true;
- }
- // if its false, we need to close recordReader.
- curRecReader.close();
- return false;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
+ public Iterator<HCatRecord> read() throws HCatException {
- @Override
- public HCatRecord next() {
- try {
- return curRecReader.getCurrentValue();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ HCatInputFormat inpFmt = new HCatInputFormat();
+ RecordReader<WritableComparable, HCatRecord> rr;
+ try {
+ TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext(conf, new TaskAttemptID());
+ rr = inpFmt.createRecordReader(split, cntxt);
+ rr.initialize(split, cntxt);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ return new HCatRecordItr(rr);
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Not allowed");
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+ HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+ curRecReader = rr;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ boolean retVal = curRecReader.nextKeyValue();
+ if (retVal) {
+ return true;
+ }
+ // if its false, we need to close recordReader.
+ curRecReader.close();
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public HCatRecord next() {
+ try {
+ return curRecReader.getCurrentValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not allowed");
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Mon Sep 10 23:28:55 2012
@@ -43,118 +43,118 @@ import org.apache.hcatalog.shims.HCatHad
/**
* This writer writes via {@link HCatOutputFormat}
- *
+ *
*/
public class HCatOutputFormatWriter extends HCatWriter {
- public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
- super(we, config);
- }
-
- public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
- super(config, sp);
- }
-
- @Override
- public WriterContext prepareWrite() throws HCatException {
- OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
- we.getTableName(), we.getPartitionKVs());
- Job job;
- try {
- job = new Job(conf);
- HCatOutputFormat.setOutput(job, jobInfo);
- HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
- HCatOutputFormat outFormat = new HCatOutputFormat();
- outFormat.checkOutputSpecs(job);
- outFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
- (job.getConfiguration(), HCatHadoopShims.Instance.get().createTaskAttemptID())).setupJob(job);
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- WriterContext cntxt = new WriterContext();
- cntxt.setConf(job.getConfiguration());
- return cntxt;
- }
-
- @Override
- public void write(Iterator<HCatRecord> recordItr) throws HCatException {
-
- int id = sp.getId();
- setVarsInConf(id);
- HCatOutputFormat outFormat = new HCatOutputFormat();
- TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext
- (conf, new TaskAttemptID(HCatHadoopShims.Instance.get().createTaskID(), id));
- OutputCommitter committer = null;
- RecordWriter<WritableComparable<?>, HCatRecord> writer;
- try {
- committer = outFormat.getOutputCommitter(cntxt);
- committer.setupTask(cntxt);
- writer = outFormat.getRecordWriter(cntxt);
- while (recordItr.hasNext()) {
- HCatRecord rec = recordItr.next();
- writer.write(null, rec);
- }
- writer.close(cntxt);
- if (committer.needsTaskCommit(cntxt)) {
- committer.commitTask(cntxt);
- }
- } catch (IOException e) {
- if (null != committer) {
+ public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+ super(we, config);
+ }
+
+ public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+ super(config, sp);
+ }
+
+ @Override
+ public WriterContext prepareWrite() throws HCatException {
+ OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
+ we.getTableName(), we.getPartitionKVs());
+ Job job;
try {
- committer.abortTask(cntxt);
- } catch (IOException e1) {
- throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ job = new Job(conf);
+ HCatOutputFormat.setOutput(job, jobInfo);
+ HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ outFormat.checkOutputSpecs(job);
+ outFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (job.getConfiguration(), HCatHadoopShims.Instance.get().createTaskAttemptID())).setupJob(job);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
}
- }
- throw new HCatException("Failed while writing", e);
- } catch (InterruptedException e) {
- if (null != committer) {
+ WriterContext cntxt = new WriterContext();
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ }
+
+ @Override
+ public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+
+ int id = sp.getId();
+ setVarsInConf(id);
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (conf, new TaskAttemptID(HCatHadoopShims.Instance.get().createTaskID(), id));
+ OutputCommitter committer = null;
+ RecordWriter<WritableComparable<?>, HCatRecord> writer;
try {
- committer.abortTask(cntxt);
- } catch (IOException e1) {
- throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ committer = outFormat.getOutputCommitter(cntxt);
+ committer.setupTask(cntxt);
+ writer = outFormat.getRecordWriter(cntxt);
+ while (recordItr.hasNext()) {
+ HCatRecord rec = recordItr.next();
+ writer.write(null, rec);
+ }
+ writer.close(cntxt);
+ if (committer.needsTaskCommit(cntxt)) {
+ committer.commitTask(cntxt);
+ }
+ } catch (IOException e) {
+ if (null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
+ } catch (InterruptedException e) {
+ if (null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
}
- }
- throw new HCatException("Failed while writing", e);
}
- }
- @Override
- public void commit(WriterContext context) throws HCatException {
- try {
- new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
- (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
- .commitJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null));
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- }
-
- @Override
- public void abort(WriterContext context) throws HCatException {
- try {
- new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
- (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
- .abortJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null),State.FAILED);
- } catch (IOException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- } catch (InterruptedException e) {
- throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
- }
- }
-
- private void setVarsInConf(int id) {
-
- // Following two config keys are required by FileOutputFormat to work
- // correctly.
- // In usual case of Hadoop, JobTracker will set these before launching
- // tasks.
- // Since there is no jobtracker here, we set it ourself.
- conf.setInt("mapred.task.partition", id);
- conf.set("mapred.task.id", "attempt__0000_r_000000_" + id);
- }
+ @Override
+ public void commit(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
+ .commitJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null));
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ @Override
+ public void abort(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
+ .abortJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null), State.FAILED);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ private void setVarsInConf(int id) {
+
+ // Following two config keys are required by FileOutputFormat to work
+ // correctly.
+ // In usual case of Hadoop, JobTracker will set these before launching
+ // tasks.
+ // Since there is no jobtracker here, we set it ourself.
+ conf.setInt("mapred.task.partition", id);
+ conf.set("mapred.task.id", "attempt__0000_r_000000_" + id);
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Mon Sep 10 23:28:55 2012
@@ -23,25 +23,25 @@ import java.util.Random;
public class DefaultStateProvider implements StateProvider {
- /**
- * Default implementation. Here, ids are generated randomly.
- */
- @Override
- public int getId() {
+ /**
+ * Default implementation. Here, ids are generated randomly.
+ */
+ @Override
+ public int getId() {
- NumberFormat numberFormat = NumberFormat.getInstance();
- numberFormat.setMinimumIntegerDigits(5);
- numberFormat.setGroupingUsed(false);
- return Integer
- .parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
- }
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+ return Integer
+ .parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+ }
- private static StateProvider sp;
+ private static StateProvider sp;
- public static synchronized StateProvider get() {
- if (null == sp) {
- sp = new DefaultStateProvider();
+ public static synchronized StateProvider get() {
+ if (null == sp) {
+ sp = new DefaultStateProvider();
+ }
+ return sp;
}
- return sp;
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Mon Sep 10 23:28:55 2012
@@ -25,10 +25,10 @@ package org.apache.hcatalog.data.transfe
*/
public interface StateProvider {
- /**
- * This method should return id assigned to slave node.
- *
- * @return id
- */
- public int getId();
+ /**
+ * This method should return id assigned to slave node.
+ *
+ * @return id
+ */
+ public int getId();
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java Mon Sep 10 23:28:55 2012
@@ -32,93 +32,93 @@ import org.apache.hcatalog.common.HCatCo
import org.apache.hcatalog.common.HCatException;
public class HarOutputCommitterPostProcessor {
-
- boolean isEnabled = false;
-
- public boolean isEnabled() {
- return isEnabled;
- }
-
- public void setEnabled(boolean enabled) {
- this.isEnabled = enabled;
- }
+ boolean isEnabled = false;
- public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
+ public boolean isEnabled() {
+ return isEnabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.isEnabled = enabled;
+ }
+
+
+ public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
// LOG.info("Archiving partition ["+partPath.toString()+"]");
- makeHar(context,partPath.toUri().toString(),harFile(partPath));
- partition.getParameters().put(Constants.IS_ARCHIVED, "true");
- }
-
- public String harFile(Path ptnPath) throws IOException{
- String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
+ makeHar(context, partPath.toUri().toString(), harFile(partPath));
+ partition.getParameters().put(Constants.IS_ARCHIVED, "true");
+ }
+
+ public String harFile(Path ptnPath) throws IOException {
+ String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
// LOG.info("har file : " + harFile);
- return harFile;
- }
+ return harFile;
+ }
- public String getParentFSPath(Path ptnPath) throws IOException {
- return ptnPath.toUri().getPath().replaceFirst("/+$", "");
- }
+ public String getParentFSPath(Path ptnPath) throws IOException {
+ return ptnPath.toUri().getPath().replaceFirst("/+$", "");
+ }
- public String getProcessedLocation(Path ptnPath) throws IOException {
- String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
+ public String getProcessedLocation(Path ptnPath) throws IOException {
+ String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
// LOG.info("har location : " + harLocn);
- return harLocn;
- }
-
-
- /**
- * Creates a har file from the contents of a given directory, using that as root.
- * @param dir Directory to archive
- * @param harFile The HAR file to create
- */
- public static void makeHar(JobContext context, String dir, String harFile) throws IOException{
+ return harLocn;
+ }
+
+
+ /**
+ * Creates a har file from the contents of a given directory, using that as root.
+ * @param dir Directory to archive
+ * @param harFile The HAR file to create
+ */
+ public static void makeHar(JobContext context, String dir, String harFile) throws IOException {
// Configuration conf = context.getConfiguration();
// Credentials creds = context.getCredentials();
-
+
// HCatUtil.logAllTokens(LOG,context);
-
- int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
- Path archivePath = new Path(harFile.substring(0,lastSep));
- final String[] args = {
- "-archiveName",
- harFile.substring(lastSep+1, harFile.length()),
- "-p",
- dir,
- "*",
- archivePath.toString()
- };
+
+ int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
+ Path archivePath = new Path(harFile.substring(0, lastSep));
+ final String[] args = {
+ "-archiveName",
+ harFile.substring(lastSep + 1, harFile.length()),
+ "-p",
+ dir,
+ "*",
+ archivePath.toString()
+ };
// for (String arg : args){
// LOG.info("Args to har : "+ arg);
// }
- try {
- Configuration newConf = new Configuration();
- FileSystem fs = archivePath.getFileSystem(newConf);
-
- String hadoopTokenFileLocationEnvSetting = System.getenv(HCatConstants.SYSENV_HADOOP_TOKEN_FILE_LOCATION);
- if ((hadoopTokenFileLocationEnvSetting != null) && (!hadoopTokenFileLocationEnvSetting.isEmpty())){
- newConf.set(HCatConstants.CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocationEnvSetting);
+ try {
+ Configuration newConf = new Configuration();
+ FileSystem fs = archivePath.getFileSystem(newConf);
+
+ String hadoopTokenFileLocationEnvSetting = System.getenv(HCatConstants.SYSENV_HADOOP_TOKEN_FILE_LOCATION);
+ if ((hadoopTokenFileLocationEnvSetting != null) && (!hadoopTokenFileLocationEnvSetting.isEmpty())) {
+ newConf.set(HCatConstants.CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocationEnvSetting);
// LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+ System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]");
- }
+ }
// for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){
// LOG.info("src : "+ds.getPath().toUri().toString());
// }
- final HadoopArchives har = new HadoopArchives(newConf);
- int rc = ToolRunner.run(har, args);
- if (rc!= 0){
- throw new Exception("Har returned error code "+rc);
- }
+ final HadoopArchives har = new HadoopArchives(newConf);
+ int rc = ToolRunner.run(har, args);
+ if (rc != 0) {
+ throw new Exception("Har returned error code " + rc);
+ }
// for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){
// LOG.info("dest : "+hs.getPath().toUri().toString());
// }
// doHarCheck(fs,harFile);
// LOG.info("Nuking " + dir);
- fs.delete(new Path(dir), true);
- } catch (Exception e){
- throw new HCatException("Error creating Har ["+harFile+"] from ["+dir+"]", e);
+ fs.delete(new Path(dir), true);
+ } catch (Exception e) {
+ throw new HCatException("Error creating Har [" + harFile + "] from [" + dir + "]", e);
+ }
}
- }
}