You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/20 22:28:26 UTC
[2/3] SQOOP-1375: Sqoop2: From/To: Create HDFS connector
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
new file mode 100644
index 0000000..eb80121
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+
+import java.io.IOException;
+
+public class HdfsSequenceWriter extends GenericHdfsWriter {
+
+ private SequenceFile.Writer filewriter;
+ private Text text;
+
+ public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+ if (codec != null) {
+ filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+ conf, filepath, Text.class, NullWritable.class,
+ SequenceFile.CompressionType.BLOCK, codec);
+ } else {
+ filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+ conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE);
+ }
+
+ text = new Text();
+ }
+
+ @Override
+ public void write(String csv) throws IOException {
+ text.set(csv);
+ filewriter.append(text, NullWritable.get());
+ }
+
+ public void destroy() throws IOException {
+ filewriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
new file mode 100644
index 0000000..78cf973
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import com.google.common.base.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.connector.hdfs.HdfsConstants;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+public class HdfsTextWriter extends GenericHdfsWriter {
+
+ private BufferedWriter filewriter;
+
+ @Override
+ public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+ FileSystem fs = filepath.getFileSystem(conf);
+
+ DataOutputStream filestream = fs.create(filepath, false);
+ if (codec != null) {
+ filewriter = new BufferedWriter(new OutputStreamWriter(
+ codec.createOutputStream(filestream, codec.createCompressor()),
+ Charsets.UTF_8));
+ } else {
+ filewriter = new BufferedWriter(new OutputStreamWriter(
+ filestream, Charsets.UTF_8));
+ }
+ }
+
+ @Override
+ public void write(String csv) throws IOException {
+ filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER);
+
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ filewriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties
new file mode 100644
index 0000000..3125911
--- /dev/null
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generic HDFS Connector Resources
+
+############################
+# Connection Form
+#
+connection.label = Connection configuration
+connection.help = You must supply the information requested in order to \
+ create a connection object.
+
+connection.dummy.label = Dummy parameter needed to get HDFS connector to register
+connection.dummy.help = You can write anything here. Doesn't matter.
+
+# Output From
+#
+output.label = Output configuration
+output.help = You must supply the information requested in order to \
+ get information where you want to store your data.
+
+output.storageType.label = Storage type
+output.storageType.help = Target on Hadoop ecosystem where to store data
+
+output.outputFormat.label = Output format
+output.outputFormat.help = Format in which data should be serialized
+
+output.compression.label = Compression format
+output.compression.help = Compression that should be used for the data
+
+output.customCompression.label = Custom compression format
+output.customCompression.help = Full class name of the custom compression
+
+output.outputDirectory.label = Output directory
+output.outputDirectory.help = Output directory for final data
+
+output.ignored.label = Ignored
+output.ignored.help = This value is ignored
+
+# Input Form
+#
+input.label = Input configuration
+input.help = Specifies information required to get data from Hadoop ecosystem
+
+input.inputDirectory.label = Input directory
+input.inputDirectory.help = Directory that should be exported
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/resources/sqoopconnector.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/sqoopconnector.properties b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties
new file mode 100644
index 0000000..fa4e5e1
--- /dev/null
+++ b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generic HDFS Connector Properties
+org.apache.sqoop.connector.class = org.apache.sqoop.connector.hdfs.HdfsConnector
+org.apache.sqoop.connector.name = hdfs-connector
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 39d48c7..1e8ab52 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -180,6 +180,10 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
return null;
}
+ if (schema == null) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+ }
+
if (fields.length != schema.getColumns().size()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
"The data " + getTextData() + " has the wrong number of fields.");
@@ -189,7 +193,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
for (int i = 0; i < fields.length; i++) {
Type colType = cols[i].getType();
- if (fields[i].equals("NULL")) {
+ //TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method
+ if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) {
out[i] = null;
continue;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
index 9219074..4d41679 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -37,7 +37,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
/** Number of fields. */
- INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.")
+ INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."),
+
+ INTERMEDIATE_DATA_FORMAT_0006("Schema missing.")
;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index d642c3e..e98a0fc 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -35,10 +35,11 @@ limitations under the License.
<modules>
<module>connector-sdk</module>
<module>connector-generic-jdbc</module>
- <!-- Uncomment and finish connectors after sqoop framework will become stable
- <module>connector-mysql-jdbc</module>
- <module>connector-mysql-fastpath</module>
- -->
+ <module>connector-hdfs</module>
+ <!-- Uncomment and finish connectors after sqoop framework will become stable
+ <module>connector-mysql-jdbc</module>
+ <module>connector-mysql-fastpath</module>
+ -->
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
index f19a23e..46257f2 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
@@ -18,20 +18,13 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.InputForm;
import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.framework.configuration.OutputCompression;
-import org.apache.sqoop.framework.configuration.OutputForm;
import org.apache.sqoop.framework.configuration.ThrottlingForm;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
-/**
- * Validate framework configuration objects
- */
public class FrameworkValidator extends Validator {
-
@Override
public Validation validateConnection(Object connectionConfiguration) {
Validation validation = new Validation(ConnectionConfiguration.class);
@@ -39,61 +32,16 @@ public class FrameworkValidator extends Validator {
return validation;
}
-
@Override
public Validation validateJob(Object jobConfiguration) {
- JobConfiguration configuration = (JobConfiguration)jobConfiguration;
Validation validation = new Validation(JobConfiguration.class);
- validateThrottingForm(validation, configuration.throttling);
- return super.validateJob(jobConfiguration);
- }
-
-// private Validation validateExportJob(Object jobConfiguration) {
-// Validation validation = new Validation(ExportJobConfiguration.class);
-// ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
-//
-// validateInputForm(validation, configuration.input);
-// validateThrottingForm(validation, configuration.throttling);
-//
-// return validation;
-// }
-//
-// private Validation validateImportJob(Object jobConfiguration) {
-// Validation validation = new Validation(ImportJobConfiguration.class);
-// ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
-//
-// validateOutputForm(validation, configuration.output);
-// validateThrottingForm(validation, configuration.throttling);
-//
-// return validation;
-// }
+ JobConfiguration conf = (JobConfiguration)jobConfiguration;
+ validateThrottlingForm(validation,conf.throttling);
-// private void validateInputForm(Validation validation, InputForm input) {
-// if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
-// validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
-// }
-// }
-//
-// private void validateOutputForm(Validation validation, OutputForm output) {
-// if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
-// validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
-// }
-// if(output.customCompression != null &&
-// output.customCompression.trim().length() > 0 &&
-// output.compression != OutputCompression.CUSTOM) {
-// validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-// "custom compression should be blank as " + output.compression + " is being used.");
-// }
-// if(output.compression == OutputCompression.CUSTOM &&
-// (output.customCompression == null ||
-// output.customCompression.trim().length() == 0)
-// ) {
-// validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-// "custom compression is blank.");
-// }
-// }
+ return validation;
+ };
- private void validateThrottingForm(Validation validation, ThrottlingForm throttling) {
+ private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) {
if(throttling.extractors != null && throttling.extractors < 1) {
validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor");
}
@@ -102,4 +50,5 @@ public class FrameworkValidator extends Validator {
validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader");
}
}
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 5571928..b1b37f6 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -36,6 +36,7 @@ import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.utils.ClassUtils;
@@ -434,12 +435,17 @@ public class JobManager implements Reconfigurable {
request.getConnectorJobConfig(Direction.FROM)));
// @TODO(Abe): Alter behavior of Schema here. Need from Schema.
- // Retrieve and persist the schema
- request.getSummary().setConnectorSchema(initializer.getSchema(
- initializerContext,
- request.getConnectorConnectionConfig(Direction.FROM),
- request.getConnectorJobConfig(Direction.FROM)
- ));
+
+
+ Schema fromSchema = initializer.getSchema(initializerContext,
+ request.getConnectorConnectionConfig(Direction.FROM),
+ request.getConnectorJobConfig(Direction.FROM));
+
+ // request.getSummary().setConnectorSchema(initializer.getSchema(
+ // initializerContext,
+ // request.getConnectorConnectionConfig(ConnectorType.FROM),
+ // request.getConnectorJobConfig(ConnectorType.FROM)
+ // ));
// Initialize To Connector callback.
baseCallback = request.getToCallback();
@@ -468,6 +474,11 @@ public class JobManager implements Reconfigurable {
request.getConnectorJobConfig(Direction.TO)));
// @TODO(Abe): Alter behavior of Schema here. Need To Schema.
+
+ Schema toSchema = initializer.getSchema(initializerContext,
+ request.getConnectorConnectionConfig(Direction.TO),
+ request.getConnectorJobConfig(Direction.TO));
+
// Retrieve and persist the schema
// request.getSummary().setConnectorSchema(initializer.getSchema(
// initializerContext,
@@ -475,6 +486,12 @@ public class JobManager implements Reconfigurable {
// request.getConnectorJobConfig(ConnectorType.TO)
// ));
+ //TODO: Need better logic here
+ if (fromSchema != null)
+ request.getSummary().setConnectorSchema(fromSchema);
+ else
+ request.getSummary().setConnectorSchema(toSchema);
+
// Bootstrap job from framework perspective
prepareSubmission(request);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index be6099e..bf3f785 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -176,9 +176,11 @@ public class SubmissionRequest {
switch(type) {
case FROM:
fromConnector = connector;
+ break;
case TO:
toConnector = connector;
+ break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
@@ -238,10 +240,10 @@ public class SubmissionRequest {
switch(type) {
case FROM:
fromConnectorConnectionConfig = config;
-
+ break;
case TO:
toConnectorConnectionConfig = config;
-
+ break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
}
@@ -264,10 +266,10 @@ public class SubmissionRequest {
switch(type) {
case FROM:
fromConnectorJobConfig = config;
-
+ break;
case TO:
toConnectorJobConfig = config;
-
+ break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
}
@@ -290,10 +292,10 @@ public class SubmissionRequest {
switch(type) {
case FROM:
fromFrameworkConnectionConfig = config;
-
+ break;
case TO:
toFrameworkConnectionConfig = config;
-
+ break;
default:
throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
deleted file mode 100644
index 6665429..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ExportJobConfiguration {
-
- @Form public InputForm input;
-
- @Form public ThrottlingForm throttling;
-
- public ExportJobConfiguration() {
- input = new InputForm();
- throttling = new ThrottlingForm();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
deleted file mode 100644
index 2a35eb9..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ImportJobConfiguration {
-
- @Form public OutputForm output;
-
- @Form public ThrottlingForm throttling;
-
- public ImportJobConfiguration() {
- output = new OutputForm();
- throttling = new ThrottlingForm();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
deleted file mode 100644
index d5cbeec..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class InputForm {
-
- @Input(size = 255) public String inputDirectory;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
index 7c653bf..0abc611 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
@@ -22,8 +22,8 @@ import org.apache.sqoop.model.Form;
@ConfigurationClass
public class JobConfiguration {
-
- @Form public ThrottlingForm throttling;
+ @Form
+ public ThrottlingForm throttling;
public JobConfiguration() {
throttling = new ThrottlingForm();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
deleted file mode 100644
index 6cac46d..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-/**
- * Supported compressions
- */
-public enum OutputCompression {
- NONE,
- DEFAULT,
- DEFLATE,
- GZIP,
- BZIP2,
- LZO,
- LZ4,
- SNAPPY,
- CUSTOM,
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
deleted file mode 100644
index b2cdb44..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class OutputForm {
-
- @Input public StorageType storageType;
-
- @Input public OutputFormat outputFormat;
-
- @Input public OutputCompression compression;
-
- @Input(size = 255) public String customCompression;
-
- @Input(size = 255) public String outputDirectory;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java
deleted file mode 100644
index 4cd3589..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-/**
- * Various supported formats on disk
- */
-public enum OutputFormat {
- /**
- * Comma separated text file
- */
- TEXT_FILE,
-
- /**
- * Sequence file
- */
- SEQUENCE_FILE,
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java b/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java
deleted file mode 100644
index dbe9f95..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-/**
- * Various storage types that Sqoop is supporting
- */
-public enum StorageType {
- /**
- * Direct HDFS import
- */
- HDFS,
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
index f875ceb..90395ac 100644
--- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
+++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
@@ -17,15 +17,7 @@
*/
package org.apache.sqoop.framework;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.framework.configuration.OutputCompression;
-import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.junit.Before;
-import org.junit.Test;
+//import org.apache.sqoop.framework.configuration.OutputCompression;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index 31df04c..f19e01c 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -28,7 +28,7 @@ import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.framework.FrameworkManager;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index ff328cb..b05954b 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -80,49 +80,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
-
- // @TODO(Abe): Move to HDFS connector.
-// if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
-// context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-// } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
-// context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-// } else {
-// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
-// "Format: " + jobConf.output.outputFormat);
-// }
-// if(getCompressionCodecName(jobConf) != null) {
-// context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
-// getCompressionCodecName(jobConf));
-// context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-// }
}
- // @TODO(Abe): Move to HDFS connector.
-// private String getCompressionCodecName(ImportJobConfiguration jobConf) {
-// if(jobConf.output.compression == null)
-// return null;
-// switch(jobConf.output.compression) {
-// case NONE:
-// return null;
-// case DEFAULT:
-// return "org.apache.hadoop.io.compress.DefaultCodec";
-// case DEFLATE:
-// return "org.apache.hadoop.io.compress.DeflateCodec";
-// case GZIP:
-// return "org.apache.hadoop.io.compress.GzipCodec";
-// case BZIP2:
-// return "org.apache.hadoop.io.compress.BZip2Codec";
-// case LZO:
-// return "com.hadoop.compression.lzo.LzoCodec";
-// case LZ4:
-// return "org.apache.hadoop.io.compress.Lz4Codec";
-// case SNAPPY:
-// return "org.apache.hadoop.io.compress.SnappyCodec";
-// case CUSTOM:
-// return jobConf.output.customCompression.trim();
-// }
-// return null;
-// }
+
/**
* Our execution engine have additional dependencies that needs to be available
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
deleted file mode 100644
index c3beed7..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.common.ImmutableContext;
-
-/**
- * Implementation of immutable context that is based on Hadoop configuration
- * object. Each context property is prefixed with special prefix and loaded
- * directly.
- */
-public class PrefixContext implements ImmutableContext {
-
- Configuration configuration;
- String prefix;
-
- public PrefixContext(Configuration configuration, String prefix) {
- this.configuration = configuration;
- this.prefix = prefix;
- }
-
- @Override
- public String getString(String key) {
- return configuration.get(prefix + key);
- }
-
- @Override
- public String getString(String key, String defaultValue) {
- return configuration.get(prefix + key, defaultValue);
- }
-
- @Override
- public long getLong(String key, long defaultValue) {
- return configuration.getLong(prefix + key, defaultValue);
- }
-
- @Override
- public int getInt(String key, int defaultValue) {
- return configuration.getInt(prefix + key, defaultValue);
- }
-
- @Override
- public boolean getBoolean(String key, boolean defaultValue) {
- return configuration.getBoolean(prefix + key, defaultValue);
- }
-
- /*
- * TODO: Use getter methods for retrieval instead of
- * exposing configuration directly.
- */
- public Configuration getConfiguration() {
- return configuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
deleted file mode 100644
index 27afd8c..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.util.LineReader;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.etl.io.DataWriter;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-
-/**
- * Extract from HDFS.
- * Default field delimiter of a record is comma.
- */
-//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
-//
-// public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
-//
-// private Configuration conf;
-// private DataWriter dataWriter;
-// private long rowRead = 0;
-//
-// @Override
-// public void extract(ExtractorContext context,
-// ConnectionConfiguration connectionConfiguration,
-// ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-//
-// conf = ((PrefixContext) context.getContext()).getConfiguration();
-// dataWriter = context.getDataWriter();
-//
-// try {
-// HdfsExportPartition p = partition;
-// LOG.info("Working on partition: " + p);
-// int numFiles = p.getNumberOfFiles();
-// for (int i = 0; i < numFiles; i++) {
-// extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
-// }
-// } catch (IOException e) {
-// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
-// }
-// }
-//
-// private void extractFile(Path file, long start, long length)
-// throws IOException {
-// long end = start + length;
-// LOG.info("Extracting file " + file);
-// LOG.info("\t from offset " + start);
-// LOG.info("\t to offset " + end);
-// LOG.info("\t of length " + length);
-// if(isSequenceFile(file)) {
-// extractSequenceFile(file, start, length);
-// } else {
-// extractTextFile(file, start, length);
-// }
-// }
-//
-// /**
-// * Extracts Sequence file
-// * @param file
-// * @param start
-// * @param length
-// * @throws IOException
-// */
-// private void extractSequenceFile(Path file, long start, long length)
-// throws IOException {
-// LOG.info("Extracting sequence file");
-// long end = start + length;
-// SequenceFile.Reader filereader = new SequenceFile.Reader(
-// file.getFileSystem(conf), file, conf);
-//
-// if (start > filereader.getPosition()) {
-// filereader.sync(start); // sync to start
-// }
-//
-// Text line = new Text();
-// boolean hasNext = filereader.next(line);
-// while (hasNext) {
-// rowRead++;
-// dataWriter.writeStringRecord(line.toString());
-// line = new Text();
-// hasNext = filereader.next(line);
-// if (filereader.getPosition() >= end && filereader.syncSeen()) {
-// break;
-// }
-// }
-// filereader.close();
-// }
-//
-// /**
-// * Extracts Text file
-// * @param file
-// * @param start
-// * @param length
-// * @throws IOException
-// */
-// private void extractTextFile(Path file, long start, long length)
-// throws IOException {
-// LOG.info("Extracting text file");
-// long end = start + length;
-// FileSystem fs = file.getFileSystem(conf);
-// FSDataInputStream filestream = fs.open(file);
-// CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
-// LineReader filereader;
-// Seekable fileseeker = filestream;
-//
-// // Hadoop 1.0 does not have support for custom record delimiter and thus
-// // we
-// // are supporting only default one.
-// // We might add another "else if" case for SplittableCompressionCodec once
-// // we drop support for Hadoop 1.0.
-// if (codec == null) {
-// filestream.seek(start);
-// filereader = new LineReader(filestream);
-// } else {
-// filereader = new LineReader(codec.createInputStream(filestream,
-// codec.createDecompressor()), conf);
-// fileseeker = filestream;
-// }
-// if (start != 0) {
-// // always throw away first record because
-// // one extra line is read in previous split
-// start += filereader.readLine(new Text(), 0);
-// }
-// int size;
-// LOG.info("Start position: " + String.valueOf(start));
-// long next = start;
-// while (next <= end) {
-// Text line = new Text();
-// size = filereader.readLine(line, Integer.MAX_VALUE);
-// if (size == 0) {
-// break;
-// }
-// if (codec == null) {
-// next += size;
-// } else {
-// next = fileseeker.getPos();
-// }
-// rowRead++;
-// dataWriter.writeStringRecord(line.toString());
-// }
-// LOG.info("Extracting ended on position: " + fileseeker.getPos());
-// filestream.close();
-// }
-//
-// @Override
-// public long getRowsRead() {
-// return rowRead;
-// }
-//
-// /**
-// * Returns true if given file is sequence
-// * @param file
-// * @return boolean
-// */
-// private boolean isSequenceFile(Path file) {
-// SequenceFile.Reader filereader = null;
-// try {
-// filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
-// filereader.close();
-// } catch (IOException e) {
-// return false;
-// }
-// return true;
-// }
-//}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
deleted file mode 100644
index cdbdaa8..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.etl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * This class derives mostly from CombineFileSplit of Hadoop, i.e.
- * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
- */
-public class HdfsExportPartition extends Partition {
-
- private long lenFiles;
- private int numFiles;
- private Path[] files;
- private long[] offsets;
- private long[] lengths;
- private String[] locations;
-
- public HdfsExportPartition() {}
-
- public HdfsExportPartition(Path[] files, long[] offsets,
- long[] lengths, String[] locations) {
- for(long length : lengths) {
- this.lenFiles += length;
- }
- this.numFiles = files.length;
- this.files = files;
- this.offsets = offsets;
- this.lengths = lengths;
- this.locations = locations;
- }
-
- public long getLengthOfFiles() {
- return lenFiles;
- }
-
- public int getNumberOfFiles() {
- return numFiles;
- }
-
- public Path getFile(int i) {
- return files[i];
- }
-
- public long getOffset(int i) {
- return offsets[i];
- }
-
- public long getLength(int i) {
- return lengths[i];
- }
-
- public String[] getLocations() {
- return locations;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- numFiles = in.readInt();
-
- files = new Path[numFiles];
- for(int i=0; i<numFiles; i++) {
- files[i] = new Path(in.readUTF());
- }
-
- offsets = new long[numFiles];
- for(int i=0; i<numFiles; i++) {
- offsets[i] = in.readLong();
- }
-
- lengths = new long[numFiles];
- for(int i=0; i<numFiles; i++) {
- lengths[i] = in.readLong();
- }
-
- for(long length : lengths) {
- lenFiles += length;
- }
-
- int numLocations = in.readInt();
- if (numLocations == 0) {
- locations = null;
- } else {
- locations = new String[numLocations];
- for(int i=0; i<numLocations; i++) {
- locations[i] = in.readUTF();
- }
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(numFiles);
-
- for(Path file : files) {
- out.writeUTF(file.toString());
- }
-
- for(long offset : offsets) {
- out.writeLong(offset);
- }
-
- for(long length : lengths) {
- out.writeLong(length);
- }
-
- if (locations == null || locations.length == 0) {
- out.writeInt(0);
- } else {
- out.writeInt(locations.length);
- for(String location : locations) {
- out.writeUTF(location);
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("{");
- boolean first = true;
- for(int i = 0; i < files.length; i++) {
- if(first) {
- first = false;
- } else {
- sb.append(", ");
- }
-
- sb.append(files[i]);
- sb.append(" (offset=").append(offsets[i]);
- sb.append(", end=").append(offsets[i] + lengths[i]);
- sb.append(", length=").append(lengths[i]);
- sb.append(")");
- }
- sb.append("}");
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
deleted file mode 100644
index b3590dc..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-
-/**
- * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
- * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
- */
-public class HdfsExportPartitioner extends Partitioner {
-
- public static final String SPLIT_MINSIZE_PERNODE =
- "mapreduce.input.fileinputformat.split.minsize.per.node";
- public static final String SPLIT_MINSIZE_PERRACK =
- "mapreduce.input.fileinputformat.split.minsize.per.rack";
-
- // ability to limit the size of a single split
- private long maxSplitSize = 0;
- private long minSplitSizeNode = 0;
- private long minSplitSizeRack = 0;
-
- // mapping from a rack name to the set of Nodes in the rack
- private HashMap<String, Set<String>> rackToNodes =
- new HashMap<String, Set<String>>();
-
- @Override
- public List<Partition> getPartitions(PartitionerContext context,
- Object connectionConfiguration, Object jobConfiguration) {
-
- Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
-
- try {
- long numInputBytes = getInputSize(conf);
- maxSplitSize = numInputBytes / context.getMaxPartitions();
-
- if(numInputBytes % context.getMaxPartitions() != 0 ) {
- maxSplitSize += 1;
- }
-
- long minSizeNode = 0;
- long minSizeRack = 0;
- long maxSize = 0;
-
- // the values specified by setxxxSplitSize() takes precedence over the
- // values that might have been specified in the config
- if (minSplitSizeNode != 0) {
- minSizeNode = minSplitSizeNode;
- } else {
- minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
- }
- if (minSplitSizeRack != 0) {
- minSizeRack = minSplitSizeRack;
- } else {
- minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
- }
- if (maxSplitSize != 0) {
- maxSize = maxSplitSize;
- } else {
- maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
- }
- if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
- throw new IOException("Minimum split size pernode " + minSizeNode +
- " cannot be larger than maximum split size " +
- maxSize);
- }
- if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
- throw new IOException("Minimum split size per rack" + minSizeRack +
- " cannot be larger than maximum split size " +
- maxSize);
- }
- if (minSizeRack != 0 && minSizeNode > minSizeRack) {
- throw new IOException("Minimum split size per node" + minSizeNode +
- " cannot be smaller than minimum split " +
- "size per rack " + minSizeRack);
- }
-
- // all the files in input set
- String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
- FileSystem fs = FileSystem.get(conf);
-
- List<Path> paths = new LinkedList<Path>();
- for(FileStatus status : fs.listStatus(new Path(indir))) {
- if(!status.isDir()) {
- paths.add(status.getPath());
- }
- }
-
- List<Partition> partitions = new ArrayList<Partition>();
- if (paths.size() == 0) {
- return partitions;
- }
-
- // create splits for all files that are not in any pool.
- getMoreSplits(conf, paths,
- maxSize, minSizeNode, minSizeRack, partitions);
-
- // free up rackToNodes map
- rackToNodes.clear();
-
- return partitions;
-
- } catch (IOException e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e);
- }
- }
-
- private long getInputSize(Configuration conf) throws IOException {
- String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
- FileSystem fs = FileSystem.get(conf);
- FileStatus[] files = fs.listStatus(new Path(indir));
- long count = 0;
- for (FileStatus file : files) {
- count += file.getLen();
- }
- return count;
- }
-
- /**
- * Return all the splits in the specified set of paths
- */
- private void getMoreSplits(Configuration conf, List<Path> paths,
- long maxSize, long minSizeNode, long minSizeRack,
- List<Partition> partitions) throws IOException {
-
- // all blocks for all the files in input set
- OneFileInfo[] files;
-
- // mapping from a rack name to the list of blocks it has
- HashMap<String, List<OneBlockInfo>> rackToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
-
- // mapping from a block to the nodes on which it has replicas
- HashMap<OneBlockInfo, String[]> blockToNodes =
- new HashMap<OneBlockInfo, String[]>();
-
- // mapping from a node to the list of blocks that it contains
- HashMap<String, List<OneBlockInfo>> nodeToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
-
- files = new OneFileInfo[paths.size()];
- if (paths.size() == 0) {
- return;
- }
-
- // populate all the blocks for all files
- for (int i = 0; i < paths.size(); i++) {
- files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)),
- rackToBlocks, blockToNodes, nodeToBlocks,
- rackToNodes, maxSize);
- }
-
- ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- Set<String> nodes = new HashSet<String>();
- long curSplitSize = 0;
-
- // process all nodes and create splits that are local
- // to a node.
- for (Iterator<Map.Entry<String,
- List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
- iter.hasNext();) {
-
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
-
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(partitions, nodes, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
- }
- }
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the
- // same rack later on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(partitions, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
- }
- }
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
- }
-
- // if blocks in a rack are below the specified minimum size, then keep them
- // in 'overflow'. After the processing of all racks is complete, these
- // overflow blocks will be combined into splits.
- ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- Set<String> racks = new HashSet<String>();
-
- // Process all racks over and over again until there is no more work to do.
- while (blockToNodes.size() > 0) {
-
- // Create one split for this rack before moving over to the next rack.
- // Come back to this rack after creating a single split for each of the
- // remaining racks.
- // Process one rack location at a time, Combine all possible blocks that
- // reside on this rack as one split. (constrained by minimum and maximum
- // split size).
-
- // iterate over all racks
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
- rackToBlocks.entrySet().iterator(); iter.hasNext();) {
-
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- racks.add(one.getKey());
- List<OneBlockInfo> blocks = one.getValue();
-
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- boolean createdSplit = false;
- for (OneBlockInfo oneblock : blocks) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(partitions, getHosts(racks), validBlocks);
- createdSplit = true;
- break;
- }
- }
- }
-
- // if we created a split, then just go to the next rack
- if (createdSplit) {
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- continue;
- }
-
- if (!validBlocks.isEmpty()) {
- if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
- // if there is a minimum size specified, then create a single split
- // otherwise, store these blocks into overflow data structure
- addCreatedSplit(partitions, getHosts(racks), validBlocks);
- } else {
- // There were a few blocks in this rack that
- // remained to be processed. Keep them in 'overflow' block list.
- // These will be combined later.
- overflowBlocks.addAll(validBlocks);
- }
- }
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- }
- }
-
- assert blockToNodes.isEmpty();
- assert curSplitSize == 0;
- assert validBlocks.isEmpty();
- assert racks.isEmpty();
-
- // Process all overflow blocks
- for (OneBlockInfo oneblock : overflowBlocks) {
- validBlocks.add(oneblock);
- curSplitSize += oneblock.length;
-
- // This might cause an exiting rack location to be re-added,
- // but it should be ok.
- for (int i = 0; i < oneblock.racks.length; i++) {
- racks.add(oneblock.racks[i]);
- }
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(partitions, getHosts(racks), validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- }
- }
-
- // Process any remaining blocks, if any.
- if (!validBlocks.isEmpty()) {
- addCreatedSplit(partitions, getHosts(racks), validBlocks);
- }
- }
-
- private boolean isSplitable(Configuration conf, Path file) {
- final CompressionCodec codec =
- new CompressionCodecFactory(conf).getCodec(file);
-
- // This method might be improved for SplittableCompression codec when we
- // drop support for Hadoop 1.0
- return null == codec;
-
- }
-
- /**
- * Create a single split from the list of blocks specified in validBlocks
- * Add this new split into list.
- */
- private void addCreatedSplit(List<Partition> partitions,
- Collection<String> locations,
- ArrayList<OneBlockInfo> validBlocks) {
- // create an input split
- Path[] files = new Path[validBlocks.size()];
- long[] offsets = new long[validBlocks.size()];
- long[] lengths = new long[validBlocks.size()];
- for (int i = 0; i < validBlocks.size(); i++) {
- files[i] = validBlocks.get(i).onepath;
- offsets[i] = validBlocks.get(i).offset;
- lengths[i] = validBlocks.get(i).length;
- }
-
- // add this split to the list that is returned
- HdfsExportPartition partition = new HdfsExportPartition(
- files, offsets, lengths, locations.toArray(new String[0]));
- partitions.add(partition);
- }
-
- private Set<String> getHosts(Set<String> racks) {
- Set<String> hosts = new HashSet<String>();
- for (String rack : racks) {
- if (rackToNodes.containsKey(rack)) {
- hosts.addAll(rackToNodes.get(rack));
- }
- }
- return hosts;
- }
-
- private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
- String rack, String host) {
- Set<String> hosts = rackToNodes.get(rack);
- if (hosts == null) {
- hosts = new HashSet<String>();
- rackToNodes.put(rack, hosts);
- }
- hosts.add(host);
- }
-
- /**
- * information about one file from the File System
- */
- private static class OneFileInfo {
- private long fileSize; // size of the file
- private OneBlockInfo[] blocks; // all blocks in this file
-
- OneFileInfo(Path path, Configuration conf,
- boolean isSplitable,
- HashMap<String, List<OneBlockInfo>> rackToBlocks,
- HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> nodeToBlocks,
- HashMap<String, Set<String>> rackToNodes,
- long maxSize)
- throws IOException {
- this.fileSize = 0;
-
- // get block locations from file system
- FileSystem fs = path.getFileSystem(conf);
- FileStatus stat = fs.getFileStatus(path);
- BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
- stat.getLen());
- // create a list of all block and their locations
- if (locations == null) {
- blocks = new OneBlockInfo[0];
- } else {
- if (!isSplitable) {
- // if the file is not splitable, just create the one block with
- // full file length
- blocks = new OneBlockInfo[1];
- fileSize = stat.getLen();
- blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
- .getHosts(), locations[0].getTopologyPaths());
- } else {
- ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
- locations.length);
- for (int i = 0; i < locations.length; i++) {
- fileSize += locations[i].getLength();
-
- // each split can be a maximum of maxSize
- long left = locations[i].getLength();
- long myOffset = locations[i].getOffset();
- long myLength = 0;
- do {
- if (maxSize == 0) {
- myLength = left;
- } else {
- if (left > maxSize && left < 2 * maxSize) {
- // if remainder is between max and 2*max - then
- // instead of creating splits of size max, left-max we
- // create splits of size left/2 and left/2. This is
- // a heuristic to avoid creating really really small
- // splits.
- myLength = left / 2;
- } else {
- myLength = Math.min(maxSize, left);
- }
- }
- OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
- myLength, locations[i].getHosts(), locations[i]
- .getTopologyPaths());
- left -= myLength;
- myOffset += myLength;
-
- blocksList.add(oneblock);
- } while (left > 0);
- }
- blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
- }
-
- for (OneBlockInfo oneblock : blocks) {
- // add this block to the block --> node locations map
- blockToNodes.put(oneblock, oneblock.hosts);
-
- // For blocks that do not have host/rack information,
- // assign to default rack.
- String[] racks = null;
- if (oneblock.hosts.length == 0) {
- racks = new String[]{NetworkTopology.DEFAULT_RACK};
- } else {
- racks = oneblock.racks;
- }
-
- // add this block to the rack --> block map
- for (int j = 0; j < racks.length; j++) {
- String rack = racks[j];
- List<OneBlockInfo> blklist = rackToBlocks.get(rack);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- rackToBlocks.put(rack, blklist);
- }
- blklist.add(oneblock);
- if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
- }
- }
-
- // add this block to the node --> block map
- for (int j = 0; j < oneblock.hosts.length; j++) {
- String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- nodeToBlocks.put(node, blklist);
- }
- blklist.add(oneblock);
- }
- }
- }
- }
-
- }
-
- /**
- * information about one block from the File System
- */
- private static class OneBlockInfo {
- Path onepath; // name of this file
- long offset; // offset in file
- long length; // length of this block
- String[] hosts; // nodes on which this block resides
- String[] racks; // network topology of hosts
-
- OneBlockInfo(Path path, long offset, long len,
- String[] hosts, String[] topologyPaths) {
- this.onepath = path;
- this.offset = offset;
- this.hosts = hosts;
- this.length = len;
- assert (hosts.length == topologyPaths.length ||
- topologyPaths.length == 0);
-
- // if the file system does not have any rack information, then
- // use dummy rack location.
- if (topologyPaths.length == 0) {
- topologyPaths = new String[hosts.length];
- for (int i = 0; i < topologyPaths.length; i++) {
- topologyPaths[i] = (new NodeBase(hosts[i],
- NetworkTopology.DEFAULT_RACK)).toString();
- }
- }
-
- // The topology paths have the host name included as the last
- // component. Strip it.
- this.racks = new String[topologyPaths.length];
- for (int i = 0; i < topologyPaths.length; i++) {
- this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
deleted file mode 100644
index d4ffb13..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.etl.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsSequenceImportLoader extends Loader {
-
- public static final String EXTENSION = ".seq";
-
- @Override
- public void load(LoaderContext context, Object oc, Object oj) throws Exception {
- DataReader reader = context.getDataReader();
-
- Configuration conf = new Configuration();
-// Configuration conf = ((EtlContext)context).getConfiguration();
- String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
- String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
- CompressionCodec codec = null;
- if (codecname != null) {
- Class<?> clz = ClassUtils.loadClass(codecname);
- if (clz == null) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname);
- }
-
- try {
- codec = (CompressionCodec) clz.newInstance();
- if (codec instanceof Configurable) {
- ((Configurable) codec).setConf(conf);
- }
- } catch (Exception e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e);
- }
- }
-
- filename += EXTENSION;
-
- try {
- Path filepath = new Path(filename);
- SequenceFile.Writer filewriter;
- if (codec != null) {
- filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
- conf, filepath, Text.class, NullWritable.class,
- CompressionType.BLOCK, codec);
- } else {
- filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
- conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
- }
-
- String csv;
- Text text = new Text();
- while ((csv = reader.readTextRecord()) != null) {
- text.set(csv);
- filewriter.append(text, NullWritable.get());
- }
- filewriter.close();
-
- } catch (IOException e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
deleted file mode 100644
index 7b799ca..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import com.google.common.base.Charsets;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.etl.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsTextImportLoader extends Loader {
-
- private final char recordDelimiter;
-
- public HdfsTextImportLoader() {
- recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
- }
-
- @Override
- public void load(LoaderContext context, Object oc, Object oj) throws Exception{
- DataReader reader = context.getDataReader();
-
- Configuration conf = new Configuration();
-// Configuration conf = ((EtlContext)context).getConfiguration();
- String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
- String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
- CompressionCodec codec = null;
- if (codecname != null) {
- Class<?> clz = ClassUtils.loadClass(codecname);
- if (clz == null) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname);
- }
-
- try {
- codec = (CompressionCodec) clz.newInstance();
- if (codec instanceof Configurable) {
- ((Configurable) codec).setConf(conf);
- }
- } catch (Exception e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e);
- }
-
- filename += codec.getDefaultExtension();
- }
-
- try {
- Path filepath = new Path(filename);
- FileSystem fs = filepath.getFileSystem(conf);
-
- BufferedWriter filewriter;
- DataOutputStream filestream = fs.create(filepath, false);
- if (codec != null) {
- filewriter = new BufferedWriter(new OutputStreamWriter(
- codec.createOutputStream(filestream, codec.createCompressor()),
- Charsets.UTF_8));
- } else {
- filewriter = new BufferedWriter(new OutputStreamWriter(
- filestream, Charsets.UTF_8));
- }
-
- String csv;
- while ((csv = reader.readTextRecord()) != null) {
- filewriter.write(csv + recordDelimiter);
- }
- filewriter.close();
-
- } catch (IOException e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 8e31ef5..59431f4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.schema.Schema;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index e96909a..1c1133a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -33,7 +33,7 @@ import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 6e2cfbf..1d60ba3 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -31,7 +31,7 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
@@ -66,7 +66,16 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
+
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+ if (schema==null) {
+ schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
+ }
+
+ if (schema==null) {
+ LOG.info("setting an empty schema");
+ }
+
String intermediateDataFormatName = conf.get(JobConstants
.INTERMEDIATE_DATA_FORMAT);