You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/20 22:28:27 UTC
[3/3] git commit: SQOOP-1375: Sqoop2: From/To: Create HDFS connector
SQOOP-1375: Sqoop2: From/To: Create HDFS connector
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5c29a2a2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5c29a2a2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5c29a2a2
Branch: refs/heads/SQOOP-1367
Commit: 5c29a2a291b54d1c15f7e2482893bc074a011a76
Parents: 7127948
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed Aug 20 13:23:41 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Wed Aug 20 13:23:41 2014 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/client/SqoopClient.java | 2 +-
common/pom.xml | 66 +++
.../org/apache/sqoop/common/PrefixContext.java | 70 +++
connector/connector-hdfs/pom.xml | 83 +++
.../sqoop/connector/hdfs/HdfsConnector.java | 132 +++++
.../connector/hdfs/HdfsConnectorError.java | 52 ++
.../sqoop/connector/hdfs/HdfsConstants.java | 31 ++
.../sqoop/connector/hdfs/HdfsDestroyer.java | 36 ++
.../sqoop/connector/hdfs/HdfsExtractor.java | 199 +++++++
.../sqoop/connector/hdfs/HdfsInitializer.java | 45 ++
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 140 +++++
.../sqoop/connector/hdfs/HdfsPartition.java | 161 ++++++
.../sqoop/connector/hdfs/HdfsPartitioner.java | 555 +++++++++++++++++++
.../sqoop/connector/hdfs/HdfsValidator.java | 89 +++
.../configuration/ConnectionConfiguration.java | 31 ++
.../hdfs/configuration/ConnectionForm.java | 29 +
.../configuration/FromJobConfiguration.java | 32 ++
.../connector/hdfs/configuration/InputForm.java | 30 +
.../hdfs/configuration/OutputCompression.java | 33 ++
.../hdfs/configuration/OutputForm.java | 36 ++
.../hdfs/configuration/OutputFormat.java | 33 ++
.../hdfs/configuration/StorageType.java | 28 +
.../hdfs/configuration/ToJobConfiguration.java | 31 ++
.../hdfs/hdfsWriter/GenericHdfsWriter.java | 34 ++
.../hdfs/hdfsWriter/HdfsSequenceWriter.java | 57 ++
.../hdfs/hdfsWriter/HdfsTextWriter.java | 61 ++
.../hdfs-connector-resources.properties | 58 ++
.../main/resources/sqoopconnector.properties | 18 +
.../idf/CSVIntermediateDataFormat.java | 7 +-
.../idf/IntermediateDataFormatError.java | 4 +-
connector/pom.xml | 9 +-
.../sqoop/framework/FrameworkValidator.java | 63 +--
.../org/apache/sqoop/framework/JobManager.java | 29 +-
.../sqoop/framework/SubmissionRequest.java | 14 +-
.../configuration/ExportJobConfiguration.java | 37 --
.../configuration/ImportJobConfiguration.java | 37 --
.../framework/configuration/InputForm.java | 30 -
.../configuration/JobConfiguration.java | 4 +-
.../configuration/OutputCompression.java | 33 --
.../framework/configuration/OutputForm.java | 38 --
.../framework/configuration/OutputFormat.java | 33 --
.../framework/configuration/StorageType.java | 28 -
.../sqoop/framework/TestFrameworkValidator.java | 10 +-
.../sqoop/repository/TestJdbcRepository.java | 2 +-
.../mapreduce/MapreduceExecutionEngine.java | 42 +-
.../org/apache/sqoop/job/PrefixContext.java | 70 ---
.../sqoop/job/etl/HdfsExportExtractor.java | 194 -------
.../sqoop/job/etl/HdfsExportPartition.java | 160 ------
.../sqoop/job/etl/HdfsExportPartitioner.java | 552 ------------------
.../sqoop/job/etl/HdfsSequenceImportLoader.java | 94 ----
.../sqoop/job/etl/HdfsTextImportLoader.java | 101 ----
.../sqoop/job/mr/SqoopDestroyerExecutor.java | 2 +-
.../apache/sqoop/job/mr/SqoopInputFormat.java | 2 +-
.../org/apache/sqoop/job/mr/SqoopMapper.java | 11 +-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 10 +-
.../mapreduce/MapreduceExecutionEngineTest.java | 13 +-
.../org/apache/sqoop/job/TestHdfsExtract.java | 31 +-
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 33 +-
pom.xml | 11 +
.../derby/DerbyRepositoryHandler.java | 50 +-
server/pom.xml | 5 +
.../apache/sqoop/handler/JobRequestHandler.java | 1 +
test/pom.xml | 5 +
.../sqoop/test/testcases/ConnectorTestCase.java | 4 +-
.../connector/jdbc/generic/TableImportTest.java | 8 -
.../jdbc/generic/imports/PartitionerTest.java | 8 -
.../SubmissionWithDisabledModelObjectsTest.java | 11 -
67 files changed, 2309 insertions(+), 1659 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index 1d93ae3..2b3171c 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -369,7 +369,7 @@ public class SqoopClient {
fromConnection.getPersistenceId(),
toConnection.getPersistenceId(),
getConnector(fromConnection.getConnectorId()).getJobForms(Direction.FROM),
- getConnector(fromConnection.getConnectorId()).getJobForms(Direction.TO),
+ getConnector(toConnection.getConnectorId()).getJobForms(Direction.TO),
getFramework().getJobForms()
);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 9bfa07d..151a649 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -106,5 +106,71 @@ limitations under the License.
</plugin>
</plugins>
</build>
+ <!-- Profiles for various supported Hadoop distributions -->
+ <profiles>
+
+ <!-- Hadoop 1.x -->
+ <profile>
+ <id>hadoop100</id>
+
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>100</value>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Hadoop 2.x (active by default) -->
+ <profile>
+ <id>hadoop200</id>
+
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ <property>
+ <name>hadoop.profile</name>
+ <value>200</value>
+ </property>
+ </activation>
+
+ <properties>
+ <hadoop.profile>200</hadoop.profile>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/common/src/main/java/org/apache/sqoop/common/PrefixContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/PrefixContext.java b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java
new file mode 100644
index 0000000..6434e6d
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Implementation of immutable context that is based on Hadoop configuration
+ * object. Each context property is prefixed with special prefix and loaded
+ * directly.
+ */
+public class PrefixContext implements ImmutableContext {
+
+ Configuration configuration;
+ String prefix;
+
+ public PrefixContext(Configuration configuration, String prefix) {
+ this.configuration = configuration;
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String getString(String key) {
+ return configuration.get(prefix + key);
+ }
+
+ @Override
+ public String getString(String key, String defaultValue) {
+ return configuration.get(prefix + key, defaultValue);
+ }
+
+ @Override
+ public long getLong(String key, long defaultValue) {
+ return configuration.getLong(prefix + key, defaultValue);
+ }
+
+ @Override
+ public int getInt(String key, int defaultValue) {
+ return configuration.getInt(prefix + key, defaultValue);
+ }
+
+ @Override
+ public boolean getBoolean(String key, boolean defaultValue) {
+ return configuration.getBoolean(prefix + key, defaultValue);
+ }
+
+ /*
+ * TODO: Use getter methods for retrieval instead of
+ * exposing configuration directly.
+ */
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
new file mode 100644
index 0000000..8df9f11
--- /dev/null
+++ b/connector/connector-hdfs/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop.connector</groupId>
+ <artifactId>sqoop-connector-hdfs</artifactId>
+ <name>Sqoop HDFS Connector</name>
+
+ <!-- TODO: Hardcoding Hadoop200 for now -->
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>sqoop-spi</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <finalName>sqoop</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
new file mode 100644
index 0000000..557091e
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+import org.apache.sqoop.validation.Validator;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+public class HdfsConnector extends SqoopConnector {
+
+ private static final From FROM = new From(
+ HdfsInitializer.class,
+ HdfsPartitioner.class,
+ HdfsExtractor.class,
+ HdfsDestroyer.class);
+
+ private static final To TO = new To(
+ HdfsInitializer.class,
+ HdfsLoader.class,
+ HdfsDestroyer.class);
+
+ private static final HdfsValidator hdfsValidator = new HdfsValidator();
+
+ /**
+ * Retrieve connector version.
+ *
+ * @return Version encoded as a string
+ */
+ @Override
+ public String getVersion() {
+ return VersionInfo.getVersion();
+ }
+
+ /**
+ * @param locale
+ * @return the resource bundle associated with the given locale.
+ */
+ @Override
+ public ResourceBundle getBundle(Locale locale) {
+ return ResourceBundle.getBundle(
+ HdfsConstants.RESOURCE_BUNDLE_NAME, locale);
+ }
+
+ /**
+ * @return Get connection configuration class
+ */
+ @Override
+ public Class getConnectionConfigurationClass() {
+ return ConnectionConfiguration.class;
+ }
+
+ /**
+ * @param jobType
+ * @return Get job configuration class for given type or null if not supported
+ */
+ @Override
+ public Class getJobConfigurationClass(Direction jobType) {
+ switch (jobType) {
+ case FROM:
+ return FromJobConfiguration.class;
+ case TO:
+ return ToJobConfiguration.class;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * @return an <tt>From</tt> that provides classes for performing import.
+ */
+ @Override
+ public From getFrom() {
+ return FROM;
+ }
+
+ /**
+ * @return an <tt>To</tt> that provides classes for performing export.
+ */
+ @Override
+ public To getTo() {
+ return TO;
+ }
+
+ /**
+ * Returns validation object that Sqoop framework can use to validate user
+ * supplied forms before accepting them. This object will be used both for
+ * connection and job forms.
+ *
+ * @return Validator object
+ */
+ @Override
+ public Validator getValidator() {
+ return hdfsValidator;
+ }
+
+ /**
+ * Returns an {@linkplain org.apache.sqoop.connector.spi.MetadataUpgrader} object that can upgrade the
+ * connection and job metadata.
+ *
+ * @return MetadataUpgrader object
+ */
+ @Override
+ public MetadataUpgrader getMetadataUpgrader() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
new file mode 100644
index 0000000..8a095d2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum HdfsConnectorError implements ErrorCode{
+ /** Error occurs during partitioner run */
+ GENERIC_HDFS_CONNECTOR_0000("Error occurs during partitioner run"),
+ /** Error occurs during extractor run */
+ GENERIC_HDFS_CONNECTOR_0001("Error occurs during extractor run"),
+ /** Unsupported output format type found **/
+ GENERIC_HDFS_CONNECTOR_0002("Unknown output format type"),
+ /** The system was unable to load the specified class. */
+ GENERIC_HDFS_CONNECTOR_0003("Unable to load the specified class"),
+ /** The system was unable to instantiate the specified class. */
+ GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"),
+ /** Error occurs during loader run */
+ GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run")
+
+ ;
+
+ private final String message;
+
+ private HdfsConnectorError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
new file mode 100644
index 0000000..a27aff1
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.job.Constants;
+
+public final class HdfsConstants extends Constants {
+
+ // Resource bundle name
+ public static final String RESOURCE_BUNDLE_NAME =
+ "hdfs-connector-resources";
+
+ public static final char DEFAULT_RECORD_DELIMITER = '\n';
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
new file mode 100644
index 0000000..74b1cb8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class HdfsDestroyer extends Destroyer {
+ /**
+ * Callback to clean up after job execution.
+ *
+ * @param context Destroyer context
+ * @param o Connection configuration object
+ * @param o2 Job configuration object
+ */
+ @Override
+ public void destroy(DestroyerContext context, Object o, Object o2) {
+ //TODO: Add a "success" flag?
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
new file mode 100644
index 0000000..fc12381
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.LineReader;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.PrefixContext;
+
+import java.io.IOException;
+
+/**
+ * Extract from HDFS.
+ * Default field delimiter of a record is comma.
+ */
+
+
+public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> {
+
+ public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
+
+ private Configuration conf;
+ private DataWriter dataWriter;
+ private long rowRead = 0;
+
+ @Override
+ public void extract(ExtractorContext context,
+ ConnectionConfiguration connectionConfiguration,
+ FromJobConfiguration jobConfiguration, HdfsPartition partition) {
+
+ conf = ((PrefixContext) context.getContext()).getConfiguration();
+ dataWriter = context.getDataWriter();
+
+ try {
+ HdfsPartition p = partition;
+ LOG.info("Working on partition: " + p);
+ int numFiles = p.getNumberOfFiles();
+ for (int i = 0; i < numFiles; i++) {
+ extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+ }
+ } catch (IOException e) {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e);
+ }
+ }
+
+ private void extractFile(Path file, long start, long length)
+ throws IOException {
+ long end = start + length;
+ LOG.info("Extracting file " + file);
+ LOG.info("\t from offset " + start);
+ LOG.info("\t to offset " + end);
+ LOG.info("\t of length " + length);
+ if(isSequenceFile(file)) {
+ extractSequenceFile(file, start, length);
+ } else {
+ extractTextFile(file, start, length);
+ }
+ }
+
+ /**
+ * Extracts Sequence file
+ * @param file
+ * @param start
+ * @param length
+ * @throws IOException
+ */
+ private void extractSequenceFile(Path file, long start, long length)
+ throws IOException {
+ LOG.info("Extracting sequence file");
+ long end = start + length;
+ SequenceFile.Reader filereader = new SequenceFile.Reader(
+ file.getFileSystem(conf), file, conf);
+
+ if (start > filereader.getPosition()) {
+ filereader.sync(start); // sync to start
+ }
+
+ Text line = new Text();
+ boolean hasNext = filereader.next(line);
+ while (hasNext) {
+ rowRead++;
+ dataWriter.writeStringRecord(line.toString());
+ line = new Text();
+ hasNext = filereader.next(line);
+ if (filereader.getPosition() >= end && filereader.syncSeen()) {
+ break;
+ }
+ }
+ filereader.close();
+ }
+
+ /**
+ * Extracts Text file
+ * @param file
+ * @param start
+ * @param length
+ * @throws IOException
+ */
+ private void extractTextFile(Path file, long start, long length)
+ throws IOException {
+ LOG.info("Extracting text file");
+ long end = start + length;
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataInputStream filestream = fs.open(file);
+ CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+ LineReader filereader;
+ Seekable fileseeker = filestream;
+
+ // Hadoop 1.0 does not have support for custom record delimiter and thus
+ // we
+ // are supporting only default one.
+ // We might add another "else if" case for SplittableCompressionCodec once
+ // we drop support for Hadoop 1.0.
+ if (codec == null) {
+ filestream.seek(start);
+ filereader = new LineReader(filestream);
+ } else {
+ filereader = new LineReader(codec.createInputStream(filestream,
+ codec.createDecompressor()), conf);
+ fileseeker = filestream;
+ }
+ if (start != 0) {
+ // always throw away first record because
+ // one extra line is read in previous split
+ start += filereader.readLine(new Text(), 0);
+ }
+ int size;
+ LOG.info("Start position: " + String.valueOf(start));
+ long next = start;
+ while (next <= end) {
+ Text line = new Text();
+ size = filereader.readLine(line, Integer.MAX_VALUE);
+ if (size == 0) {
+ break;
+ }
+ if (codec == null) {
+ next += size;
+ } else {
+ next = fileseeker.getPos();
+ }
+ rowRead++;
+ dataWriter.writeStringRecord(line.toString());
+ }
+ LOG.info("Extracting ended on position: " + fileseeker.getPos());
+ filestream.close();
+ }
+
+ @Override
+ public long getRowsRead() {
+ return rowRead;
+ }
+
+ /**
+ * Returns true if given file is sequence
+ * @param file
+ * @return boolean
+ */
+ private boolean isSequenceFile(Path file) {
+ SequenceFile.Reader filereader = null;
+ try {
+ filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
+ filereader.close();
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
new file mode 100644
index 0000000..d2d12a8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+
+
+public class HdfsInitializer extends Initializer {
+ /**
+ * Initialize new submission based on given configuration properties. Any
+ * needed temporary values might be saved to context object and they will be
+ * promoted to all other part of the workflow automatically.
+ *
+ * @param context Initializer context object
+ * @param connection Connector's connection configuration object
+ * @param job Connector's job configuration object
+ */
+ @Override
+ public void initialize(InitializerContext context, Object connection, Object job) {
+
+ }
+
+
+ @Override
+ public Schema getSchema(InitializerContext context, Object connection, Object job) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
new file mode 100644
index 0000000..5a924f9
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class HdfsLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> {
+ /**
+ * Load data to target.
+ *
+ * @param context Loader context object
+ * @param connection Connection configuration
+ * @param job Job configuration
+ * @throws Exception
+ */
+ @Override
+ public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception {
+
+ DataReader reader = context.getDataReader();
+
+ Configuration conf = new Configuration();
+
+ String directoryName = job.output.outputDirectory;
+ String codecname = getCompressionCodecName(job);
+
+ CompressionCodec codec = null;
+ if (codecname != null) {
+ Class<?> clz = ClassUtils.loadClass(codecname);
+ if (clz == null) {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname);
+ }
+
+ try {
+ codec = (CompressionCodec) clz.newInstance();
+ if (codec instanceof Configurable) {
+ ((Configurable) codec).setConf(conf);
+ }
+ } catch (Exception e) {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e);
+ }
+ }
+
+ String filename = directoryName + "/" + UUID.randomUUID() + getExtension(job,codec);
+
+ try {
+ Path filepath = new Path(filename);
+
+ GenericHdfsWriter filewriter = getWriter(job);
+
+ filewriter.initialize(filepath,conf,codec);
+
+ String csv;
+
+ while ((csv = reader.readTextRecord()) != null) {
+ filewriter.write(csv);
+ }
+ filewriter.destroy();
+
+ } catch (IOException e) {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e);
+ }
+
+ }
+
+ private GenericHdfsWriter getWriter(ToJobConfiguration job) {
+ if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE)
+ return new HdfsSequenceWriter();
+ else
+ return new HdfsTextWriter();
+ }
+
+
+ private String getCompressionCodecName(ToJobConfiguration jobConf) {
+ if(jobConf.output.compression == null)
+ return null;
+ switch(jobConf.output.compression) {
+ case NONE:
+ return null;
+ case DEFAULT:
+ return "org.apache.hadoop.io.compress.DefaultCodec";
+ case DEFLATE:
+ return "org.apache.hadoop.io.compress.DeflateCodec";
+ case GZIP:
+ return "org.apache.hadoop.io.compress.GzipCodec";
+ case BZIP2:
+ return "org.apache.hadoop.io.compress.BZip2Codec";
+ case LZO:
+ return "com.hadoop.compression.lzo.LzoCodec";
+ case LZ4:
+ return "org.apache.hadoop.io.compress.Lz4Codec";
+ case SNAPPY:
+ return "org.apache.hadoop.io.compress.SnappyCodec";
+ case CUSTOM:
+ return jobConf.output.customCompression.trim();
+ }
+ return null;
+ }
+
+ //TODO: We should probably support configurable extensions at some point
+ private static String getExtension(ToJobConfiguration job, CompressionCodec codec) {
+ if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE)
+ return ".seq";
+ if (codec == null)
+ return ".txt";
+ return codec.getDefaultExtension();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
new file mode 100644
index 0000000..b801356
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.job.etl.Partition;
+
+/**
+ * This class derives mostly from CombineFileSplit of Hadoop, i.e.
+ * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
+ */
+public class HdfsPartition extends Partition {
+
+ private long lenFiles;
+ private int numFiles;
+ private Path[] files;
+ private long[] offsets;
+ private long[] lengths;
+ private String[] locations;
+
+ public HdfsPartition() {}
+
+ public HdfsPartition(Path[] files, long[] offsets,
+ long[] lengths, String[] locations) {
+ for(long length : lengths) {
+ this.lenFiles += length;
+ }
+ this.numFiles = files.length;
+ this.files = files;
+ this.offsets = offsets;
+ this.lengths = lengths;
+ this.locations = locations;
+ }
+
+ public long getLengthOfFiles() {
+ return lenFiles;
+ }
+
+ public int getNumberOfFiles() {
+ return numFiles;
+ }
+
+ public Path getFile(int i) {
+ return files[i];
+ }
+
+ public long getOffset(int i) {
+ return offsets[i];
+ }
+
+ public long getLength(int i) {
+ return lengths[i];
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ numFiles = in.readInt();
+
+ files = new Path[numFiles];
+ for(int i=0; i<numFiles; i++) {
+ files[i] = new Path(in.readUTF());
+ }
+
+ offsets = new long[numFiles];
+ for(int i=0; i<numFiles; i++) {
+ offsets[i] = in.readLong();
+ }
+
+ lengths = new long[numFiles];
+ for(int i=0; i<numFiles; i++) {
+ lengths[i] = in.readLong();
+ }
+
+ for(long length : lengths) {
+ lenFiles += length;
+ }
+
+ int numLocations = in.readInt();
+ if (numLocations == 0) {
+ locations = null;
+ } else {
+ locations = new String[numLocations];
+ for(int i=0; i<numLocations; i++) {
+ locations[i] = in.readUTF();
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(numFiles);
+
+ for(Path file : files) {
+ out.writeUTF(file.toString());
+ }
+
+ for(long offset : offsets) {
+ out.writeLong(offset);
+ }
+
+ for(long length : lengths) {
+ out.writeLong(length);
+ }
+
+ if (locations == null || locations.length == 0) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(locations.length);
+ for(String location : locations) {
+ out.writeUTF(location);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ boolean first = true;
+ for(int i = 0; i < files.length; i++) {
+ if(first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+
+ sb.append(files[i]);
+ sb.append(" (offset=").append(offsets[i]);
+ sb.append(", end=").append(offsets[i] + lengths[i]);
+ sb.append(", length=").append(lengths[i]);
+ sb.append(")");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
new file mode 100644
index 0000000..df764d2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.common.PrefixContext;
+
+/**
+ * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
+ * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
+ */
+public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> {
+
+ public static final String SPLIT_MINSIZE_PERNODE =
+ "mapreduce.input.fileinputformat.split.minsize.per.node";
+ public static final String SPLIT_MINSIZE_PERRACK =
+ "mapreduce.input.fileinputformat.split.minsize.per.rack";
+
+ // ability to limit the size of a single split
+ private long maxSplitSize = 0;
+ private long minSplitSizeNode = 0;
+ private long minSplitSizeRack = 0;
+
+ // mapping from a rack name to the set of Nodes in the rack
+ private HashMap<String, Set<String>> rackToNodes =
+ new HashMap<String, Set<String>>();
+
+ @Override
+ public List<Partition> getPartitions(PartitionerContext context,
+ ConnectionConfiguration connectionConfiguration, FromJobConfiguration jobConfiguration) {
+
+ Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
+
+ try {
+ long numInputBytes = getInputSize(conf, jobConfiguration.input.inputDirectory);
+ maxSplitSize = numInputBytes / context.getMaxPartitions();
+
+ if(numInputBytes % context.getMaxPartitions() != 0 ) {
+ maxSplitSize += 1;
+ }
+
+ long minSizeNode = 0;
+ long minSizeRack = 0;
+ long maxSize = 0;
+
+ // the values specified by setxxxSplitSize() takes precedence over the
+ // values that might have been specified in the config
+ if (minSplitSizeNode != 0) {
+ minSizeNode = minSplitSizeNode;
+ } else {
+ minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
+ }
+ if (minSplitSizeRack != 0) {
+ minSizeRack = minSplitSizeRack;
+ } else {
+ minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
+ }
+ if (maxSplitSize != 0) {
+ maxSize = maxSplitSize;
+ } else {
+ maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+ }
+ if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+ throw new IOException("Minimum split size pernode " + minSizeNode +
+ " cannot be larger than maximum split size " +
+ maxSize);
+ }
+ if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+ throw new IOException("Minimum split size per rack" + minSizeRack +
+ " cannot be larger than maximum split size " +
+ maxSize);
+ }
+ if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+ throw new IOException("Minimum split size per node" + minSizeNode +
+ " cannot be smaller than minimum split " +
+ "size per rack " + minSizeRack);
+ }
+
+ // all the files in input set
+ String indir = jobConfiguration.input.inputDirectory;
+ FileSystem fs = FileSystem.get(conf);
+
+ List<Path> paths = new LinkedList<Path>();
+ for(FileStatus status : fs.listStatus(new Path(indir))) {
+ if(!status.isDir()) {
+ paths.add(status.getPath());
+ }
+ }
+
+ List<Partition> partitions = new ArrayList<Partition>();
+ if (paths.size() == 0) {
+ return partitions;
+ }
+
+ // create splits for all files that are not in any pool.
+ getMoreSplits(conf, paths,
+ maxSize, minSizeNode, minSizeRack, partitions);
+
+ // free up rackToNodes map
+ rackToNodes.clear();
+
+ return partitions;
+
+ } catch (IOException e) {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e);
+ }
+ }
+
+ //TODO: Perhaps get the FS from connection configuration so we can support remote HDFS
+ private long getInputSize(Configuration conf, String indir) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] files = fs.listStatus(new Path(indir));
+ long count = 0;
+ for (FileStatus file : files) {
+ count += file.getLen();
+ }
+ return count;
+ }
+
+ /**
+ * Return all the splits in the specified set of paths
+ */
+ private void getMoreSplits(Configuration conf, List<Path> paths,
+ long maxSize, long minSizeNode, long minSizeRack,
+ List<Partition> partitions) throws IOException {
+
+ // all blocks for all the files in input set
+ OneFileInfo[] files;
+
+ // mapping from a rack name to the list of blocks it has
+ HashMap<String, List<OneBlockInfo>> rackToBlocks =
+ new HashMap<String, List<OneBlockInfo>>();
+
+ // mapping from a block to the nodes on which it has replicas
+ HashMap<OneBlockInfo, String[]> blockToNodes =
+ new HashMap<OneBlockInfo, String[]>();
+
+ // mapping from a node to the list of blocks that it contains
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks =
+ new HashMap<String, List<OneBlockInfo>>();
+
+ files = new OneFileInfo[paths.size()];
+ if (paths.size() == 0) {
+ return;
+ }
+
+ // populate all the blocks for all files
+ for (int i = 0; i < paths.size(); i++) {
+ files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)),
+ rackToBlocks, blockToNodes, nodeToBlocks,
+ rackToNodes, maxSize);
+ }
+
+ ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+ Set<String> nodes = new HashSet<String>();
+ long curSplitSize = 0;
+
+ // process all nodes and create splits that are local
+ // to a node.
+ for (Iterator<Map.Entry<String,
+ List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
+ iter.hasNext();) {
+
+ Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+ nodes.add(one.getKey());
+ List<OneBlockInfo> blocksInNode = one.getValue();
+
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ for (OneBlockInfo oneblock : blocksInNode) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(partitions, nodes, validBlocks);
+ curSplitSize = 0;
+ validBlocks.clear();
+ }
+ }
+ }
+ // if there were any blocks left over and their combined size is
+ // larger than minSplitNode, then combine them into one split.
+ // Otherwise add them back to the unprocessed pool. It is likely
+ // that they will be combined with other blocks from the
+ // same rack later on.
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(partitions, nodes, validBlocks);
+ } else {
+ for (OneBlockInfo oneblock : validBlocks) {
+ blockToNodes.put(oneblock, oneblock.hosts);
+ }
+ }
+ validBlocks.clear();
+ nodes.clear();
+ curSplitSize = 0;
+ }
+
+ // if blocks in a rack are below the specified minimum size, then keep them
+ // in 'overflow'. After the processing of all racks is complete, these
+ // overflow blocks will be combined into splits.
+ ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+ Set<String> racks = new HashSet<String>();
+
+ // Process all racks over and over again until there is no more work to do.
+ while (blockToNodes.size() > 0) {
+
+ // Create one split for this rack before moving over to the next rack.
+ // Come back to this rack after creating a single split for each of the
+ // remaining racks.
+ // Process one rack location at a time, Combine all possible blocks that
+ // reside on this rack as one split. (constrained by minimum and maximum
+ // split size).
+
+ // iterate over all racks
+ for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
+ rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+ Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+ racks.add(one.getKey());
+ List<OneBlockInfo> blocks = one.getValue();
+
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ boolean createdSplit = false;
+ for (OneBlockInfo oneblock : blocks) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(partitions, getHosts(racks), validBlocks);
+ createdSplit = true;
+ break;
+ }
+ }
+ }
+
+ // if we created a split, then just go to the next rack
+ if (createdSplit) {
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
+ continue;
+ }
+
+ if (!validBlocks.isEmpty()) {
+ if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+ // if there is a minimum size specified, then create a single split
+ // otherwise, store these blocks into overflow data structure
+ addCreatedSplit(partitions, getHosts(racks), validBlocks);
+ } else {
+ // There were a few blocks in this rack that
+ // remained to be processed. Keep them in 'overflow' block list.
+ // These will be combined later.
+ overflowBlocks.addAll(validBlocks);
+ }
+ }
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
+ }
+ }
+
+ assert blockToNodes.isEmpty();
+ assert curSplitSize == 0;
+ assert validBlocks.isEmpty();
+ assert racks.isEmpty();
+
+ // Process all overflow blocks
+ for (OneBlockInfo oneblock : overflowBlocks) {
+ validBlocks.add(oneblock);
+ curSplitSize += oneblock.length;
+
+ // This might cause an exiting rack location to be re-added,
+ // but it should be ok.
+ for (int i = 0; i < oneblock.racks.length; i++) {
+ racks.add(oneblock.racks[i]);
+ }
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(partitions, getHosts(racks), validBlocks);
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
+ }
+ }
+
+ // Process any remaining blocks, if any.
+ if (!validBlocks.isEmpty()) {
+ addCreatedSplit(partitions, getHosts(racks), validBlocks);
+ }
+ }
+
+ private boolean isSplitable(Configuration conf, Path file) {
+ final CompressionCodec codec =
+ new CompressionCodecFactory(conf).getCodec(file);
+
+ // This method might be improved for SplittableCompression codec when we
+ // drop support for Hadoop 1.0
+ return null == codec;
+
+ }
+
+ /**
+ * Create a single split from the list of blocks specified in validBlocks
+ * Add this new split into list.
+ */
+ private void addCreatedSplit(List<Partition> partitions,
+ Collection<String> locations,
+ ArrayList<OneBlockInfo> validBlocks) {
+ // create an input split
+ Path[] files = new Path[validBlocks.size()];
+ long[] offsets = new long[validBlocks.size()];
+ long[] lengths = new long[validBlocks.size()];
+ for (int i = 0; i < validBlocks.size(); i++) {
+ files[i] = validBlocks.get(i).onepath;
+ offsets[i] = validBlocks.get(i).offset;
+ lengths[i] = validBlocks.get(i).length;
+ }
+
+ // add this split to the list that is returned
+ HdfsPartition partition = new HdfsPartition(
+ files, offsets, lengths, locations.toArray(new String[0]));
+ partitions.add(partition);
+ }
+
+ private Set<String> getHosts(Set<String> racks) {
+ Set<String> hosts = new HashSet<String>();
+ for (String rack : racks) {
+ if (rackToNodes.containsKey(rack)) {
+ hosts.addAll(rackToNodes.get(rack));
+ }
+ }
+ return hosts;
+ }
+
+ private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+ String rack, String host) {
+ Set<String> hosts = rackToNodes.get(rack);
+ if (hosts == null) {
+ hosts = new HashSet<String>();
+ rackToNodes.put(rack, hosts);
+ }
+ hosts.add(host);
+ }
+
+ /**
+ * information about one file from the File System
+ */
+ private static class OneFileInfo {
+ private long fileSize; // size of the file
+ private OneBlockInfo[] blocks; // all blocks in this file
+
+ OneFileInfo(Path path, Configuration conf,
+ boolean isSplitable,
+ HashMap<String, List<OneBlockInfo>> rackToBlocks,
+ HashMap<OneBlockInfo, String[]> blockToNodes,
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<String, Set<String>> rackToNodes,
+ long maxSize)
+ throws IOException {
+ this.fileSize = 0;
+
+ // get block locations from file system
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus stat = fs.getFileStatus(path);
+ BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
+ stat.getLen());
+ // create a list of all block and their locations
+ if (locations == null) {
+ blocks = new OneBlockInfo[0];
+ } else {
+ if (!isSplitable) {
+ // if the file is not splitable, just create the one block with
+ // full file length
+ blocks = new OneBlockInfo[1];
+ fileSize = stat.getLen();
+ blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+ .getHosts(), locations[0].getTopologyPaths());
+ } else {
+ ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+ locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ fileSize += locations[i].getLength();
+
+ // each split can be a maximum of maxSize
+ long left = locations[i].getLength();
+ long myOffset = locations[i].getOffset();
+ long myLength = 0;
+ do {
+ if (maxSize == 0) {
+ myLength = left;
+ } else {
+ if (left > maxSize && left < 2 * maxSize) {
+ // if remainder is between max and 2*max - then
+ // instead of creating splits of size max, left-max we
+ // create splits of size left/2 and left/2. This is
+ // a heuristic to avoid creating really really small
+ // splits.
+ myLength = left / 2;
+ } else {
+ myLength = Math.min(maxSize, left);
+ }
+ }
+ OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+ myLength, locations[i].getHosts(), locations[i]
+ .getTopologyPaths());
+ left -= myLength;
+ myOffset += myLength;
+
+ blocksList.add(oneblock);
+ } while (left > 0);
+ }
+ blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+ }
+
+ for (OneBlockInfo oneblock : blocks) {
+ // add this block to the block --> node locations map
+ blockToNodes.put(oneblock, oneblock.hosts);
+
+ // For blocks that do not have host/rack information,
+ // assign to default rack.
+ String[] racks = null;
+ if (oneblock.hosts.length == 0) {
+ racks = new String[]{NetworkTopology.DEFAULT_RACK};
+ } else {
+ racks = oneblock.racks;
+ }
+
+ // add this block to the rack --> block map
+ for (int j = 0; j < racks.length; j++) {
+ String rack = racks[j];
+ List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ rackToBlocks.put(rack, blklist);
+ }
+ blklist.add(oneblock);
+ if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+ }
+ }
+
+ // add this block to the node --> block map
+ for (int j = 0; j < oneblock.hosts.length; j++) {
+ String node = oneblock.hosts[j];
+ List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ nodeToBlocks.put(node, blklist);
+ }
+ blklist.add(oneblock);
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * information about one block from the File System
+ */
+ private static class OneBlockInfo {
+ Path onepath; // name of this file
+ long offset; // offset in file
+ long length; // length of this block
+ String[] hosts; // nodes on which this block resides
+ String[] racks; // network topology of hosts
+
+ OneBlockInfo(Path path, long offset, long len,
+ String[] hosts, String[] topologyPaths) {
+ this.onepath = path;
+ this.offset = offset;
+ this.hosts = hosts;
+ this.length = len;
+ assert (hosts.length == topologyPaths.length ||
+ topologyPaths.length == 0);
+
+ // if the file system does not have any rack information, then
+ // use dummy rack location.
+ if (topologyPaths.length == 0) {
+ topologyPaths = new String[hosts.length];
+ for (int i = 0; i < topologyPaths.length; i++) {
+ topologyPaths[i] = (new NodeBase(hosts[i],
+ NetworkTopology.DEFAULT_RACK)).toString();
+ }
+ }
+
+ // The topology paths have the host name included as the last
+ // component. Strip it.
+ this.racks = new String[topologyPaths.length];
+ for (int i = 0; i < topologyPaths.length; i++) {
+ this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
new file mode 100644
index 0000000..4efbd33
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.hdfs.configuration.*;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
+import org.apache.sqoop.validation.Validator;
+
+/**
+ * Validate framework configuration objects
+ */
+public class HdfsValidator extends Validator {
+
+ @Override
+ public Validation validateConnection(Object connectionConfiguration) {
+ Validation validation = new Validation(ConnectionConfiguration.class);
+ // No validation on connection object
+ return validation;
+ }
+
+
+ @Override
+ public Validation validateJob(Object jobConfiguration) {
+ //TODO: I'm pretty sure this needs to call either validateExportJob or validateImportJob, depending on context
+ return super.validateJob(jobConfiguration);
+ }
+
+ private Validation validateExportJob(Object jobConfiguration) {
+ Validation validation = new Validation(FromJobConfiguration.class);
+ FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
+
+ validateInputForm(validation, configuration.input);
+
+
+ return validation;
+ }
+
+ private Validation validateImportJob(Object jobConfiguration) {
+ Validation validation = new Validation(ToJobConfiguration.class);
+ ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
+
+ validateOutputForm(validation, configuration.output);
+
+ return validation;
+ }
+
+ private void validateInputForm(Validation validation, InputForm input) {
+ if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
+ validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
+ }
+ }
+
+ private void validateOutputForm(Validation validation, OutputForm output) {
+ if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
+ validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
+ }
+ if(output.customCompression != null &&
+ output.customCompression.trim().length() > 0 &&
+ output.compression != OutputCompression.CUSTOM) {
+ validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+ "custom compression should be blank as " + output.compression + " is being used.");
+ }
+ if(output.compression == OutputCompression.CUSTOM &&
+ (output.customCompression == null ||
+ output.customCompression.trim().length() == 0)
+ ) {
+ validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+ "custom compression is blank.");
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java
new file mode 100644
index 0000000..6dd79d5
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class ConnectionConfiguration {
+ @Form
+ public ConnectionForm connection;
+
+ public ConnectionConfiguration() {
+ connection = new ConnectionForm();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java
new file mode 100644
index 0000000..7dad2a2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+@FormClass
+public class ConnectionForm {
+ //Todo: Didn't find anything that belongs here...
+ // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here
+
+ @Input(size = 255) public String dummy;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..bccb99d
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class FromJobConfiguration {
+ @Form public InputForm input;
+
+
+ public FromJobConfiguration() {
+ input = new InputForm();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java
new file mode 100644
index 0000000..413f04c
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class InputForm {
+
+ @Input(size = 255) public String inputDirectory;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java
new file mode 100644
index 0000000..55db1bc
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+/**
+ * Supported compressions
+ */
+public enum OutputCompression {
+ NONE,
+ DEFAULT,
+ DEFLATE,
+ GZIP,
+ BZIP2,
+ LZO,
+ LZ4,
+ SNAPPY,
+ CUSTOM,
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java
new file mode 100644
index 0000000..d57b4c2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class OutputForm {
+
+ @Input public OutputFormat outputFormat;
+
+ @Input public OutputCompression compression;
+
+ @Input(size = 255) public String customCompression;
+
+ @Input(size = 255) public String outputDirectory;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java
new file mode 100644
index 0000000..676c33c
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+/**
+ * Various supported formats on disk
+ */
+public enum OutputFormat {
+ /**
+ * Comma separated text file
+ */
+ TEXT_FILE,
+
+ /**
+ * Sequence file
+ */
+ SEQUENCE_FILE,
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java
new file mode 100644
index 0000000..d4aaa0a
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+/**
+ * Various storage types that Sqoop is supporting
+ */
+public enum StorageType {
+ /**
+ * Direct HDFS import
+ */
+ HDFS,
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..65ee8a7
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class ToJobConfiguration {
+ @Form
+ public OutputForm output;
+
+ public ToJobConfiguration() {
+ output = new OutputForm();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
new file mode 100644
index 0000000..2ccccc4
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import java.io.IOException;
+
+public abstract class GenericHdfsWriter {
+
+ public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException;
+
+ public abstract void write(String csv) throws IOException;
+
+ public abstract void destroy() throws IOException;
+
+}