You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/29 20:37:27 UTC
sqoop git commit: SQOOP-2849: Sqoop2: Job failure when writing
parquet in hdfs with data coming from mysql
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 f9d7c3a8e -> e06190b2f
SQOOP-2849: Sqoop2: Job failure when writing parquet in hdfs with data coming from mysql
(Abraham Fine via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e06190b2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e06190b2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e06190b2
Branch: refs/heads/sqoop2
Commit: e06190b2f8d10f1e0cef2abd66b052b56c1e679f
Parents: f9d7c3a
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Feb 29 11:37:06 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Feb 29 11:37:06 2016 -0800
----------------------------------------------------------------------
.../sqoop/connector/common/SqoopAvroUtils.java | 19 ++++++++--
.../idf/AVROIntermediateDataFormat.java | 39 +++++++++++---------
.../idf/TestAVROIntermediateDataFormat.java | 7 ++++
3 files changed, 45 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e06190b2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
index 89bc0f2..f34521c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.connector.common;
import org.apache.avro.Schema;
+import org.apache.log4j.Logger;
import org.apache.sqoop.classification.InterfaceAudience;
import org.apache.sqoop.classification.InterfaceStability;
import org.apache.sqoop.common.SqoopException;
@@ -36,6 +37,8 @@ import java.util.Set;
@InterfaceStability.Unstable
public class SqoopAvroUtils {
+ private static final Logger LOG = Logger.getLogger(SqoopAvroUtils.class);
+
public static final String COLUMN_TYPE = "columnType";
public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
@@ -44,14 +47,14 @@ public class SqoopAvroUtils {
*/
public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
// avro schema names cannot start with quotes, lets just remove them
- String name = sqoopSchema.getName().replace("\"", "");
+ String name = createAvroName(sqoopSchema.getName());
String doc = sqoopSchema.getNote();
String namespace = SQOOP_SCHEMA_NAMESPACE;
Schema schema = Schema.createRecord(name, doc, namespace, false);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
for (Column column : sqoopSchema.getColumnsArray()) {
- Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
+ Schema.Field field = new Schema.Field(createAvroName(column.getName()), createAvroFieldSchema(column), null, null);
field.addProp(COLUMN_TYPE, column.getType().toString());
fields.add(field);
}
@@ -59,6 +62,16 @@ public class SqoopAvroUtils {
return schema;
}
+ // From the avro docs:
+ // The name portion of a fullname, record field names, and enum symbols must:
+ // start with [A-Za-z_]
+ // subsequently contain only [A-Za-z0-9_]
+ public static String createAvroName(String name) {
+ String avroName = name.replaceFirst("^[0-9]", "").replaceAll("[^a-zA-Z0-9_]", "");
+ LOG.debug("Replacing name: " + name + " with Avro name: " + avroName);
+ return avroName;
+ }
+
public static Schema createAvroFieldSchema(Column column) {
Schema schema = toAvroFieldType(column);
if (!column.isNullable()) {
@@ -123,7 +136,7 @@ public class SqoopAvroUtils {
assert column instanceof org.apache.sqoop.schema.type.Enum;
Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
List<String> listOptions = new ArrayList<String>(options);
- return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
+ return Schema.createEnum(createAvroName(column.getName()), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
}
public static byte[] getBytesFromByteBuffer(Object obj) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e06190b2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
index b55f7a0..650e24c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -36,6 +36,7 @@ import org.apache.avro.util.Utf8;
import org.apache.sqoop.classification.InterfaceAudience;
import org.apache.sqoop.classification.InterfaceStability;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.common.SqoopAvroUtils;
import org.apache.sqoop.error.code.IntermediateDataFormatError;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.utils.ClassUtils;
@@ -166,11 +167,12 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
+ String name = SqoopAvroUtils.createAvroName(columns[i].getName());
if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
- avroObject.put(columns[i].getName(), null);
+ avroObject.put(name, null);
continue;
}
- avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i]));
+ avroObject.put(name, toAVRO(csvStringArray[i], columns[i]));
}
return avroObject;
}
@@ -250,56 +252,59 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
+
+ String name = SqoopAvroUtils.createAvroName(columns[i].getName());
+
if (objectArray[i] == null) {
- avroObject.put(columns[i].getName(), null);
+ avroObject.put(name, null);
continue;
}
switch (columns[i].getType()) {
case ARRAY:
case SET:
- avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i]));
+ avroObject.put(name, toList((Object[]) objectArray[i]));
break;
case ENUM:
GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]),
(String) objectArray[i]);
- avroObject.put(columns[i].getName(), enumValue);
+ avroObject.put(name, enumValue);
break;
case TEXT:
- avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i]));
+ avroObject.put(name, new Utf8((String) objectArray[i]));
break;
case BINARY:
case UNKNOWN:
- avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i]));
+ avroObject.put(name, ByteBuffer.wrap((byte[]) objectArray[i]));
break;
case MAP:
case FIXED_POINT:
case FLOATING_POINT:
- avroObject.put(columns[i].getName(), objectArray[i]);
+ avroObject.put(name, objectArray[i]);
break;
case DECIMAL:
// TODO: store as FIXED in SQOOP-16161
- avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString());
+ avroObject.put(name, ((BigDecimal) objectArray[i]).toPlainString());
break;
case DATE_TIME:
if (objectArray[i] instanceof org.joda.time.DateTime) {
- avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate()
+ avroObject.put(name, ((org.joda.time.DateTime) objectArray[i]).toDate()
.getTime());
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
- avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i])
+ avroObject.put(name, ((org.joda.time.LocalDateTime) objectArray[i])
.toDate().getTime());
}
break;
case TIME:
- avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i])
+ avroObject.put(name, ((org.joda.time.LocalTime) objectArray[i])
.toDateTimeToday().getMillis());
break;
case DATE:
- avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate()
+ avroObject.put(name, ((org.joda.time.LocalDate) objectArray[i]).toDate()
.getTime());
break;
case BIT:
- avroObject.put(columns[i].getName(), Boolean.valueOf(objectArray[i].toString()));
+ avroObject.put(name, Boolean.valueOf(objectArray[i].toString()));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
@@ -317,7 +322,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
StringBuilder csvString = new StringBuilder();
for (int i = 0; i < columns.length; i++) {
- Object obj = record.get(columns[i].getName());
+ Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
if (obj == null && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
@@ -396,8 +401,8 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
Object[] object = new Object[columns.length];
for (int i = 0; i < columns.length; i++) {
- Object obj = record.get(columns[i].getName());
- Integer nameIndex = schema.getColumnNameIndex(columns[i].getName());
+ Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
+ Integer nameIndex = schema.getColumnNameIndex(SqoopAvroUtils.createAvroName(columns[i].getName()));
Column column = columns[nameIndex];
// null is a possible value
if (obj == null && !column.isNullable()) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e06190b2/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
index 3c4d7de..cd4445d 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
@@ -39,6 +39,7 @@ import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
import org.joda.time.LocalDateTime;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -545,4 +546,10 @@ public class TestAVROIntermediateDataFormat {
dataFormat.getData();
}
+ @Test
+ public void testSchemaWithBadCharacters() {
+ Schema schema = new Schema("9`\" blah`^&*(^&*(%$^&").addColumn(new Text("one").setNullable(false));
+ AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(schema);
+ Assert.assertEquals(dataFormat.getAvroSchema().getName(), "blah");
+ }
}