You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/08/31 08:24:06 UTC
svn commit: r1379307 - in /sqoop/branches/sqoop2:
connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/
core/ core/src/main/java/org/apache/sqoop/core/
core/src/main/java/org/apache/sqoop/job/ core/src/main/java/org/apache/sqo...
Author: jarcec
Date: Fri Aug 31 06:24:05 2012
New Revision: 1379307
URL: http://svn.apache.org/viewvc?rev=1379307&view=rev
Log:
SQOOP-588. Provide MapReduce classes for executing ETL classes.
(Bilung Lee via Jarek Jarcec Cecho)
Added:
sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/
sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
Modified:
sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
sqoop/branches/sqoop2/core/pom.xml
sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java
sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
Modified: sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java (original)
+++ sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java Fri Aug 31 06:24:05 2012
@@ -20,7 +20,6 @@ package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.job.etl.Context;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.job.io.DataWriter;
public class GenericJdbcExportLoader extends Loader {
Added: sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java (added)
+++ sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.sqoop.job.etl.Partition;
+
+public class GenericJdbcImportPartition extends Partition {
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+}
Modified: sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java (original)
+++ sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java Fri Aug 31 06:24:05 2012
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.jdbc;
+import java.util.LinkedList;
import java.util.List;
import org.apache.sqoop.job.etl.Context;
@@ -28,7 +29,7 @@ public class GenericJdbcImportPartitione
@Override
public List<Partition> run(Context context) {
// TODO Auto-generated method stub
- return null;
+ return new LinkedList<Partition>();
}
}
Modified: sqoop/branches/sqoop2/core/pom.xml
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/pom.xml?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/core/pom.xml (original)
+++ sqoop/branches/sqoop2/core/pom.xml Fri Aug 31 06:24:05 2012
@@ -43,6 +43,16 @@ limitations under the License.
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
Modified: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java (original)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java Fri Aug 31 06:24:05 2012
@@ -49,7 +49,43 @@ public enum CoreError implements ErrorCo
CORE_0006("Properties configuration provider unable to load config file"),
/** The configuration system has not been initialized correctly. */
- CORE_0007("System not initialized");
+ CORE_0007("System not initialized"),
+
+ /** Error occurs during job execution. */
+ CORE_0008("Error occurs during job execution"),
+
+ /** The system was load to instantiate the specified class. */
+ CORE_0009("Unable to load the specified class"),
+
+ /** The system was unable to instantiate the specified class. */
+ CORE_0010("Unable to instantiate the specified class"),
+
+ /** The parameter already exists in the context */
+ CORE_0011("The parameter already exists in the context"),
+
+ /** The data type is not supported */
+ CORE_0012("The data type is not supported"),
+
+ /** Cannot write to the data writer */
+ CORE_0013("Cannot write to the data writer"),
+
+ /** Cannot read from the data reader */
+ CORE_0014("Cannot read to the data reader"),
+
+ /** Unable to write data due to interrupt */
+ CORE_0015("Unable to write data due to interrupt"),
+
+ /** Unable to read data due to interrupt */
+ CORE_0016("Unable to read data due to interrupt"),
+
+ /** Error occurs during extractor run */
+ CORE_0017("Error occurs during extractor run"),
+
+ /** Error occurs during loader run */
+ CORE_0018("Error occurs during loader run"),
+
+ /** Data have not been completely consumed yet */
+ CORE_0019("Data have not been completely consumed yet");
private final String message;
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.core.ConfigurationConstants;
+
+public final class JobConstants {
+
+ /**
+ * All job related configuration is prefixed with this:
+ * <tt>org.apache.sqoop.job.</tt>
+ */
+ public static final String PREFIX_JOB_CONFIG =
+ ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
+
+ public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+ + "etl.partitioner";
+
+ public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
+ + "etl.extractor";
+
+ public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+ + "etl.loader";
+
+ public static final String JOB_ETL_FIELD_NAMES = PREFIX_JOB_CONFIG
+ + "etl.field.names";
+
+ public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_JOB_CONFIG
+ + "etl.output.directory";
+
+ private JobConstants() {
+ // Disable explicit object creation
+ }
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.job.JobConstants;
+
+/**
+ * An immutable context used in the ETL framework
+ * for accessing configuration values.
+ */
+public class EtlContext implements Context {
+
+ protected Configuration conf;
+
+ public EtlContext(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public String getString(String key) {
+ return conf.get(key);
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return StringUtils.getStrings(getString(JobConstants.JOB_ETL_FIELD_NAMES));
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+
+/**
+ * A mutable context used in the ETL framework.
+ * (for example, configuration initialization)
+ */
+public class EtlMutableContext extends EtlContext implements MutableContext {
+
+ public EtlMutableContext(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public void setString(String key, String value) {
+ if (conf.get(key) != null) {
+ throw new SqoopException(CoreError.CORE_0011, key);
+ }
+
+ conf.set(key, value);
+ }
+
+ @Override
+ public void setFieldNames(String[] names) {
+ setString(JobConstants.JOB_ETL_FIELD_NAMES, StringUtils.arrayToString(names));
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+
+public class Data implements WritableComparable<Data> {
+
+ // The content is an Object to accommodate different kinds of data.
+ // For example, it can be:
+ // - Object[] for an array of object record
+ // - String for a text of CSV record
+ private Object content = null;
+
+ private static final int EMPTY_DATA = 0;
+ private static final int CSV_RECORD = 1;
+ private static final int ARRAY_RECORD = 2;
+ private int type = EMPTY_DATA;
+
+ private static char FIELD_DELIMITER = ',';
+ private static char RECORD_DELIMITER = '\n';
+
+ public void setContent(Object content) {
+ if (content == null) {
+ this.type = EMPTY_DATA;
+ } else if (content instanceof String) {
+ this.type = CSV_RECORD;
+ } else if (content instanceof Object[]) {
+ this.type = ARRAY_RECORD;
+ } else {
+ throw new SqoopException(CoreError.CORE_0012,
+ content.getClass().getName());
+ }
+ this.content = content;
+ }
+
+ public Object getContent() {
+ return content;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public boolean isEmpty() {
+ return (type == EMPTY_DATA);
+ }
+
+ @Override
+ public int compareTo(Data other) {
+ byte[] myBytes = toString().getBytes(Charset.forName("UTF-8"));
+ byte[] otherBytes = other.toString().getBytes(Charset.forName("UTF-8"));
+ return WritableComparator.compareBytes(
+ myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Data)) {
+ return false;
+ }
+
+ Data data = (Data)other;
+ if (type != data.getType()) {
+ return false;
+ }
+
+ return toString().equals(data.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ switch (type) {
+ case CSV_RECORD:
+ result += 31 * content.hashCode();
+ return result;
+ case ARRAY_RECORD:
+ Object[] array = (Object[])content;
+ for (int i = 0; i < array.length; i++) {
+ result += 31 * array[i].hashCode();
+ }
+ return result;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+ }
+ }
+
+ @Override
+ public String toString() {
+ switch (type) {
+ case CSV_RECORD:
+ return (String)content + RECORD_DELIMITER;
+ case ARRAY_RECORD:
+ StringBuilder sb = new StringBuilder();
+ Object[] array = (Object[])content;
+ for (int i = 0; i < array.length; i++) {
+ if (i != 0) {
+ sb.append(FIELD_DELIMITER);
+ }
+
+ if (array[i] instanceof String) {
+ sb.append("\'");
+ sb.append(((String)array[i]).replaceAll(
+ "\'", Matcher.quoteReplacement("\\\'")));
+ sb.append("\'");
+ } else if (array[i] instanceof byte[]) {
+ sb.append(Arrays.toString((byte[])array[i]));
+ } else {
+ sb.append(array[i].toString());
+ }
+ }
+ sb.append(RECORD_DELIMITER);
+ return sb.toString();
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ type = readType(in);
+ switch (type) {
+ case CSV_RECORD:
+ readCsv(in);
+ break;
+ case ARRAY_RECORD:
+ readArray(in);
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ writeType(out, type);
+ switch (type) {
+ case CSV_RECORD:
+ writeCsv(out);
+ break;
+ case ARRAY_RECORD:
+ writeArray(out);
+ break;
+ default:
+ throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+ }
+ }
+
+ private int readType(DataInput in) throws IOException {
+ return WritableUtils.readVInt(in);
+ }
+
+ private void writeType(DataOutput out, int type) throws IOException {
+ WritableUtils.writeVInt(out, type);
+ }
+
+ private void readCsv(DataInput in) throws IOException {
+ content = in.readUTF();
+ }
+
+ private void writeCsv(DataOutput out) throws IOException {
+ out.writeUTF((String)content);
+ }
+
+ private void readArray(DataInput in) throws IOException {
+ // read number of columns
+ int columns = in.readInt();
+ content = new Object[columns];
+ Object[] array = (Object[])content;
+ // read each column
+ for (int i = 0; i < array.length; i++) {
+ int type = readType(in);
+ switch (type) {
+ case FieldTypes.UTF:
+ array[i] = in.readUTF();
+ break;
+
+ case FieldTypes.BIN:
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ array[i] = bytes;
+ break;
+
+ case FieldTypes.DOUBLE:
+ array[i] = in.readDouble();
+ break;
+
+ case FieldTypes.FLOAT:
+ array[i] = in.readFloat();
+ break;
+
+ case FieldTypes.LONG:
+ array[i] = in.readLong();
+ break;
+
+ case FieldTypes.INT:
+ array[i] = in.readInt();
+ break;
+
+ case FieldTypes.SHORT:
+ array[i] = in.readShort();
+ break;
+
+ case FieldTypes.CHAR:
+ array[i] = in.readChar();
+ break;
+
+ case FieldTypes.BYTE:
+ array[i] = in.readByte();
+ break;
+
+ case FieldTypes.BOOLEAN:
+ array[i] = in.readBoolean();
+ break;
+
+ case FieldTypes.NULL:
+ array[i] = null;
+ break;
+
+ default:
+ throw new IOException(
+ new SqoopException(CoreError.CORE_0012, Integer.toString(type))
+ );
+ }
+ }
+ }
+
+ private void writeArray(DataOutput out) throws IOException {
+ Object[] array = (Object[])content;
+ // write number of columns
+ out.writeInt(array.length);
+ // write each column
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] instanceof String) {
+ writeType(out, FieldTypes.UTF);
+ out.writeUTF((String)array[i]);
+
+ } else if (array[i] instanceof byte[]) {
+ writeType(out, FieldTypes.BIN);
+ out.writeInt(((byte[])array[i]).length);
+ out.write((byte[])array[i]);
+
+ } else if (array[i] instanceof Double) {
+ writeType(out, FieldTypes.DOUBLE);
+ out.writeDouble((Double)array[i]);
+
+ } else if (array[i] instanceof Float) {
+ writeType(out, FieldTypes.FLOAT);
+ out.writeFloat((Float)array[i]);
+
+ } else if (array[i] instanceof Long) {
+ writeType(out, FieldTypes.LONG);
+ out.writeLong((Long)array[i]);
+
+ } else if (array[i] instanceof Integer) {
+ writeType(out, FieldTypes.INT);
+ out.writeInt((Integer)array[i]);
+
+ } else if (array[i] instanceof Short) {
+ writeType(out, FieldTypes.SHORT);
+ out.writeShort((Short)array[i]);
+
+ } else if (array[i] instanceof Character) {
+ writeType(out, FieldTypes.CHAR);
+ out.writeChar((Character)array[i]);
+
+ } else if (array[i] instanceof Byte) {
+ writeType(out, FieldTypes.BYTE);
+ out.writeByte((Byte)array[i]);
+
+ } else if (array[i] instanceof Boolean) {
+ writeType(out, FieldTypes.BOOLEAN);
+ out.writeBoolean((Boolean)array[i]);
+
+ } else if (array[i] == null) {
+ writeType(out, FieldTypes.NULL);
+
+ } else {
+ throw new IOException(
+ new SqoopException(
+ CoreError.CORE_0012, array[i].getClass().getName()
+ )
+ );
+ }
+ }
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+public final class FieldTypes {
+
+ public static final int NULL = 0;
+
+ public static final int BOOLEAN = 1;
+
+ public static final int BYTE = 10;
+ public static final int CHAR = 11;
+
+ public static final int SHORT = 20;
+ public static final int INT = 21;
+ public static final int LONG = 22;
+
+ public static final int FLOAT = 50;
+ public static final int DOUBLE = 51;
+
+ public static final int BIN = 100;
+ public static final int UTF = 101;
+
+ private FieldTypes() {
+ // Disable explicit object creation
+ }
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopFileOutputFormat
+ extends FileOutputFormat<Data, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopFileOutputFormat.class.getName());
+
+ @Override
+ public RecordWriter<Data, NullWritable> getRecordWriter(
+ TaskAttemptContext context) {
+ SqoopOutputFormatLoadExecutor executor =
+ new SqoopOutputFormatLoadExecutor(context);
+ return executor.getRecordWriter();
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+/**
+ * An InputFormat for MapReduce job.
+ */
+public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopInputFormat.class.getName());
+
+ @Override
+ public RecordReader<SqoopSplit, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ return new SqoopRecordReader();
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
+ Class<?> clz = ClassLoadingUtils.loadClass(partitionerName);
+ if (clz == null) {
+ throw new SqoopException(CoreError.CORE_0009, partitionerName);
+ }
+
+ Partitioner partitioner;
+ try {
+ partitioner = (Partitioner) clz.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, partitionerName, e);
+ }
+
+ List<Partition> partitions = partitioner.run(new EtlContext(conf));
+ List<InputSplit> splits = new LinkedList<InputSplit>();
+ for (Partition partition : partitions) {
+ SqoopSplit split = new SqoopSplit();
+ split.setPartition(partition);
+ splits.add(split);
+ }
+
+ return splits;
+ }
+
+ public static class SqoopRecordReader
+ extends RecordReader<SqoopSplit, NullWritable> {
+
+ private boolean delivered = false;
+ private SqoopSplit split = null;
+
+ @Override
+ public boolean nextKeyValue() {
+ if (delivered) {
+ return false;
+ } else {
+ delivered = true;
+ return true;
+ }
+ }
+
+ @Override
+ public SqoopSplit getCurrentKey() {
+ return split;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public float getProgress() {
+ return delivered ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ this.split = (SqoopSplit)split;
+ }
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+/**
+ * A mapper to perform map function.
+ */
+public class SqoopMapper
+ extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopMapper.class.getName());
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
+ Class<?> clz = ClassLoadingUtils.loadClass(extractorName);
+ if (clz == null) {
+ throw new SqoopException(CoreError.CORE_0009, extractorName);
+ }
+
+ Extractor extractor;
+ try {
+ extractor = (Extractor) clz.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, extractorName, e);
+ }
+
+ SqoopSplit split = context.getCurrentKey();
+
+ try {
+ extractor.run(new EtlContext(conf), split.getPartition(),
+ new MapDataWriter(context));
+
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0017, e);
+ }
+ }
+
+ public class MapDataWriter extends DataWriter {
+ private Context context;
+ private Data data;
+
+ public MapDataWriter(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public void writeArrayRecord(Object[] record) {
+ writeContent(record);
+ }
+
+ @Override
+ public void writeCsvRecord(String csv) {
+ writeContent(csv);
+ }
+
+ private void writeContent(Object content) {
+ if (data == null) {
+ data = new Data();
+ }
+
+ data.setContent(content);
+ try {
+ context.write(data, NullWritable.get());
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0013, e);
+ }
+ }
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopNullOutputFormat.class.getName());
+
+ @Override
+ public void checkOutputSpecs(JobContext context) {
+ // do nothing
+ }
+
+ @Override
+ public RecordWriter<Data, NullWritable> getRecordWriter(
+ TaskAttemptContext context) {
+ SqoopOutputFormatLoadExecutor executor =
+ new SqoopOutputFormatLoadExecutor(context);
+ return executor.getRecordWriter();
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ // return an output committer that does nothing
+ return new NullOutputCommitter();
+ }
+
+ class NullOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) { }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+public class SqoopOutputFormatLoadExecutor {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName());
+
+ private boolean readerFinished;
+ private boolean writerFinished;
+ private Data data;
+ private JobContext context;
+ private SqoopRecordWriter producer;
+ private ConsumerThread consumer;
+
+ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
+ data = new Data();
+ context = jobctx;
+ producer = new SqoopRecordWriter();
+ consumer = new ConsumerThread();
+ }
+
+ public RecordWriter<Data, NullWritable> getRecordWriter() {
+ consumer.setDaemon(true);
+ consumer.start();
+ return producer;
+ }
+
+ public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
+ @Override
+ public void write(Data key, NullWritable value) {
+ synchronized (data) {
+ if (readerFinished) {
+ consumer.checkException();
+ return;
+ }
+
+ try {
+ if (!data.isEmpty()) {
+ // wait for reader to consume data
+ data.wait();
+ }
+
+ data.setContent(key.getContent());
+
+ // notify reader that the data is ready
+ data.notify();
+
+ } catch (InterruptedException e) {
+ // inform reader that writer is finished
+ writerFinished = true;
+
+ // unlock reader so it can continue
+ data.notify();
+
+ // throw exception
+ throw new SqoopException(CoreError.CORE_0015, e);
+ }
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) {
+ synchronized (data) {
+ if (readerFinished) {
+ consumer.checkException();
+ return;
+ }
+
+ try {
+ if (!data.isEmpty()) {
+ // wait for reader to consume data
+ data.wait();
+ }
+
+ writerFinished = true;
+
+ data.notify();
+
+ } catch (InterruptedException e) {
+ // inform reader that writer is finished
+ writerFinished = true;
+
+ // unlock reader so it can continue
+ data.notify();
+
+ // throw exception
+ throw new SqoopException(CoreError.CORE_0015, e);
+ }
+ }
+ }
+ }
+
+ public class OutputFormatDataReader extends DataReader {
+ @Override
+ public Object[] readArrayRecord() {
+ return (Object[])readContent();
+ }
+
+ @Override
+ public String readCsvRecord() {
+ return (String)readContent();
+ }
+
+ private Object readContent() {
+ synchronized (data) {
+ if (writerFinished) {
+ return null;
+ }
+
+ try {
+ if (data.isEmpty()) {
+ // wait for writer to produce data
+ data.wait();
+ }
+
+ Object content = data.getContent();
+ data.setContent(null);
+
+ // notify writer that data is consumed
+ data.notify();
+
+ return content;
+
+ } catch (InterruptedException e) {
+ // inform writer that reader is finished
+ readerFinished = true;
+
+ // unlock writer so it can continue
+ data.notify();
+
+ // throw exception
+ throw new SqoopException(CoreError.CORE_0016, e);
+ }
+ }
+ }
+ }
+
+ public class ConsumerThread extends Thread {
+ private SqoopException exception = null;
+
+ public void checkException() {
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ @Override
+ public void run() {
+ DataReader reader = new OutputFormatDataReader();
+
+ Configuration conf = context.getConfiguration();
+
+ try {
+ String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
+ Class<?> clz = ClassLoadingUtils.loadClass(loaderName);
+ if (clz == null) {
+ throw new SqoopException(CoreError.CORE_0009, loaderName);
+ }
+
+ Loader loader;
+ try {
+ loader = (Loader) clz.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, loaderName, e);
+ }
+
+ try {
+ loader.run(new EtlContext(conf), reader);
+
+ } catch (Throwable t) {
+ throw new SqoopException(CoreError.CORE_0018, t);
+ }
+
+ } catch (SqoopException e) {
+ exception = e;
+ }
+
+ synchronized (data) {
+ // inform writer that reader is finished
+ readerFinished = true;
+
+ // unlock writer so it can continue
+ data.notify();
+
+ // if no exception happens yet
+ if (exception == null && !writerFinished) {
+ // create exception if data are not all consumed
+ exception = new SqoopException(CoreError.CORE_0019);
+ }
+
+ // throw deferred exception if exist
+ if (exception != null) {
+ throw exception;
+ }
+ }
+ }
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+/**
+ * An input split to be read.
+ */
+public class SqoopSplit extends InputSplit implements Writable {
+
+ private Partition partition;
+
+ public void setPartition(Partition partition) {
+ this.partition = partition;
+ }
+
+ public Partition getPartition() {
+ return partition;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // read Partition class name
+ String className = in.readUTF();
+ // instantiate Partition object
+ Class<?> clz = ClassLoadingUtils.loadClass(className);
+ if (clz == null) {
+ throw new SqoopException(CoreError.CORE_0009, className);
+ }
+ try {
+ partition = (Partition) clz.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, className, e);
+ }
+ // read Partition object content
+ partition.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // write Partition class name
+ out.writeUTF(partition.getClass().getName());
+ // write Partition object content
+ partition.write(out);
+ }
+
+}
Added: sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java (added)
+++ sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.etl.Context;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+import org.junit.Test;
+
+public class TestMapReduce {
+
+ private static final int START_ID = 1;
+ private static final int NUMBER_OF_IDS = 9;
+ private static final int NUMBER_OF_ROWS_PER_ID = 10;
+
+ @Test
+ public void testInputFormat() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ Job job = Job.getInstance(conf);
+
+ SqoopInputFormat inputformat = new SqoopInputFormat();
+ List<InputSplit> splits = inputformat.getSplits(job);
+ Assert.assertEquals(9, splits.size());
+
+ for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+ SqoopSplit split = (SqoopSplit)splits.get(id-1);
+ DummyPartition partition = (DummyPartition)split.getPartition();
+ Assert.assertEquals(id, partition.getId());
+ }
+ }
+
+ @Test
+ public void testMapper() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+
+ Job job = Job.getInstance(conf);
+ job.setInputFormatClass(SqoopInputFormat.class);
+ job.setMapperClass(SqoopMapper.class);
+ job.setMapOutputKeyClass(Data.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(DummyOutputFormat.class);
+ job.setOutputKeyClass(Data.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ boolean success = job.waitForCompletion(true);
+ Assert.assertEquals("Job failed!", true, success);
+ }
+
+ @Test
+ public void testOutputFormat() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+
+ Job job = Job.getInstance(conf);
+ job.setInputFormatClass(SqoopInputFormat.class);
+ job.setMapperClass(SqoopMapper.class);
+ job.setMapOutputKeyClass(Data.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(SqoopNullOutputFormat.class);
+ job.setOutputKeyClass(Data.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ boolean success = job.waitForCompletion(true);
+ Assert.assertEquals("Job failed!", true, success);
+ }
+
+ public static class DummyPartition extends Partition {
+ private int id;
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+ }
+
+ public static class DummyPartitioner extends Partitioner {
+ @Override
+ public List<Partition> run(Context context) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+ DummyPartition partition = new DummyPartition();
+ partition.setId(id);
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+ }
+
+ public static class DummyExtractor extends Extractor {
+ @Override
+ public void run(Context context, Partition partition, DataWriter writer) {
+ int id = ((DummyPartition)partition).getId();
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+ Object[] array = new Object[] {
+ String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
+ new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
+ new Double(id*NUMBER_OF_ROWS_PER_ID+row)
+ };
+ writer.writeArrayRecord(array);
+ }
+ }
+ }
+
+ public static class DummyOutputFormat
+ extends OutputFormat<Data, NullWritable> {
+ @Override
+ public void checkOutputSpecs(JobContext context) {
+ // do nothing
+ }
+
+ @Override
+ public RecordWriter<Data, NullWritable> getRecordWriter(
+ TaskAttemptContext context) {
+ return new DummyRecordWriter();
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ return new DummyOutputCommitter();
+ }
+
+ public static class DummyRecordWriter
+ extends RecordWriter<Data, NullWritable> {
+ private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ private Data data = new Data();
+
+ @Override
+ public void write(Data key, NullWritable value) {
+ Object[] record = new Object[] {
+ String.valueOf(index),
+ new Integer(index),
+ new Double(index)
+ };
+ data.setContent(record);
+ Assert.assertEquals(data.toString(), key.toString());
+ index++;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) {
+ // do nothing
+ }
+ }
+
+ public static class DummyOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) { }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+ }
+ }
+
+ public static class DummyLoader extends Loader {
+ private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ private Data expected = new Data();
+ private Data actual = new Data();
+
+ @Override
+ public void run(Context context, DataReader reader) {
+ Object[] array;
+ while ((array = reader.readArrayRecord()) != null) {
+ actual.setContent(array);
+
+ expected.setContent(new Object[] {
+ String.valueOf(index),
+ new Integer(index),
+ new Double(index)});
+ index++;
+
+ Assert.assertEquals(expected.toString(), actual.toString());
+ };
+ }
+ }
+
+}
Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java Fri Aug 31 06:24:05 2012
@@ -24,4 +24,6 @@ public interface Context {
public String getString(String key);
+ public String[] getFieldNames();
+
}
Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java Fri Aug 31 06:24:05 2012
@@ -19,20 +19,21 @@ package org.apache.sqoop.job.etl;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.IOException;
/**
* A part of the input data partitioned by the Partitioner.
*/
-public interface Partition {
+public abstract class Partition {
/**
* Deserialize the fields of this partition from input.
*/
- public void readFields(DataInput in);
+ public abstract void readFields(DataInput in) throws IOException;
/**
* Serialize the fields of this partition to output.
*/
- public void write(DataOutput out);
+ public abstract void write(DataOutput out) throws IOException;
}
Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java Fri Aug 31 06:24:05 2012
@@ -21,6 +21,10 @@ package org.apache.sqoop.job.io;
* An intermediate layer for passing data from the MR framework
* to the ETL framework.
*/
-public interface DataReader {
+public abstract class DataReader {
+
+ public abstract Object[] readArrayRecord();
+
+ public abstract String readCsvRecord();
}
Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java Fri Aug 31 06:24:05 2012
@@ -21,6 +21,10 @@ package org.apache.sqoop.job.io;
* An intermediate layer for passing data from the ETL framework
* to the MR framework.
*/
-public interface DataWriter {
+public abstract class DataWriter {
+
+ public abstract void writeArrayRecord(Object[] array);
+
+ public abstract void writeCsvRecord(String csv);
}