You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/23 07:53:27 UTC
git commit: SQOOP-1554: Add NullConfigurationClass/
EmptyConfigurationClass to support use cases that do not have a particular
type of config
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 151a0a12a -> 8c604754a
SQOOP-1554: Add NullConfigurationClass/ EmptyConfigurationClass to support use cases that do not have a particular type of config
(Veena Basavaraj via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/8c604754
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/8c604754
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/8c604754
Branch: refs/heads/sqoop2
Commit: 8c604754a4a8f17faaf9f946962976b937547f22
Parents: 151a0a1
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Oct 22 22:52:07 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Oct 22 22:52:07 2014 -0700
----------------------------------------------------------------------
.../sqoop/connector/hdfs/HdfsConnector.java | 20 ++++----
.../sqoop/connector/hdfs/HdfsDestroyer.java | 36 -------------
.../sqoop/connector/hdfs/HdfsExtractor.java | 22 ++++----
.../sqoop/connector/hdfs/HdfsFromDestroyer.java | 38 ++++++++++++++
.../connector/hdfs/HdfsFromInitializer.java | 48 +++++++++++++++++
.../sqoop/connector/hdfs/HdfsInitializer.java | 44 ----------------
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 12 ++---
.../sqoop/connector/hdfs/HdfsPartitioner.java | 20 ++++----
.../sqoop/connector/hdfs/HdfsToDestroyer.java | 38 ++++++++++++++
.../sqoop/connector/hdfs/HdfsToInitializer.java | 48 +++++++++++++++++
.../hdfs/configuration/LinkConfig.java | 29 -----------
.../hdfs/configuration/LinkConfiguration.java | 31 -----------
.../resources/hdfs-connector-config.properties | 8 ---
.../sqoop/connector/hdfs/TestExtractor.java | 28 +++++-----
.../apache/sqoop/connector/hdfs/TestLoader.java | 34 ++++++------
.../sqoop/connector/hdfs/TestPartitioner.java | 25 ++++-----
.../connector/common/EmptyConfiguration.java | 29 +++++++++++
.../org/apache/sqoop/driver/JobManager.java | 34 ++++++------
.../org/apache/sqoop/job/TestMapReduce.java | 21 ++++----
.../java/org/apache/sqoop/job/TestMatching.java | 9 ++--
.../org/apache/sqoop/job/etl/Destroyer.java | 4 +-
.../org/apache/sqoop/job/etl/Extractor.java | 10 ++--
.../org/apache/sqoop/job/etl/Initializer.java | 54 ++++++++++++--------
.../java/org/apache/sqoop/job/etl/Loader.java | 9 ++--
.../org/apache/sqoop/job/etl/Partitioner.java | 10 ++--
25 files changed, 365 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index e63e464..cce0e29 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -18,10 +18,13 @@
package org.apache.sqoop.connector.hdfs;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.VersionInfo;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
@@ -30,21 +33,18 @@ import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.validation.Validator;
-import java.util.Locale;
-import java.util.ResourceBundle;
-
public class HdfsConnector extends SqoopConnector {
private static final From FROM = new From(
- HdfsInitializer.class,
+ HdfsFromInitializer.class,
HdfsPartitioner.class,
HdfsExtractor.class,
- HdfsDestroyer.class);
+ HdfsFromDestroyer.class);
private static final To TO = new To(
- HdfsInitializer.class,
+ HdfsToInitializer.class,
HdfsLoader.class,
- HdfsDestroyer.class);
+ HdfsToDestroyer.class);
private static final HdfsValidator hdfsValidator = new HdfsValidator();
@@ -71,15 +71,17 @@ public class HdfsConnector extends SqoopConnector {
/**
* @return Get connection configuration class
*/
+ @SuppressWarnings("rawtypes")
@Override
public Class getLinkConfigurationClass() {
- return LinkConfiguration.class;
+ return EmptyConfiguration.class;
}
/**
* @param jobType
* @return Get job configuration class for given type or null if not supported
*/
+ @SuppressWarnings("rawtypes")
@Override
public Class getJobConfigurationClass(Direction jobType) {
switch (jobType) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
deleted file mode 100644
index 74b1cb8..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs;
-
-import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.DestroyerContext;
-
-public class HdfsDestroyer extends Destroyer {
- /**
- * Callback to clean up after job execution.
- *
- * @param context Destroyer context
- * @param o Connection configuration object
- * @param o2 Job configuration object
- */
- @Override
- public void destroy(DestroyerContext context, Object o, Object o2) {
- //TODO: Add a "success" flag?
-
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 2c8b6c8..31b0a99 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -17,6 +17,9 @@
*/
package org.apache.sqoop.connector.hdfs;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -26,23 +29,20 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.log4j.Logger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.common.PrefixContext;
-
-import java.io.IOException;
/**
* Extract from HDFS.
* Default field delimiter of a record is comma.
*/
-public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> {
+public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> {
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
@@ -51,9 +51,8 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
private long rowRead = 0;
@Override
- public void extract(ExtractorContext context,
- LinkConfiguration linkConfig,
- FromJobConfiguration fromJobConfig, HdfsPartition partition) {
+ public void extract(ExtractorContext context, EmptyConfiguration linkConfig,
+ FromJobConfiguration jobConfig, HdfsPartition partition) {
conf = ((PrefixContext) context.getContext()).getConfiguration();
dataWriter = context.getDataWriter();
@@ -91,6 +90,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
* @param length
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
private void extractSequenceFile(Path file, long start, long length)
throws IOException {
LOG.info("Extracting sequence file");
@@ -123,6 +123,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
* @param length
* @throws IOException
*/
+ @SuppressWarnings("resource")
private void extractTextFile(Path file, long start, long length)
throws IOException {
LOG.info("Extracting text file");
@@ -182,6 +183,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
* @param file
* @return boolean
*/
+ @SuppressWarnings("deprecation")
private boolean isSequenceFile(Path file) {
SequenceFile.Reader filereader = null;
try {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
new file mode 100644
index 0000000..c7d35f7
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConfiguration> {
+ /**
+ * Callback to clean up after job execution.
+ *
+ * @param context Destroyer context
+ * @param linkConfig link configuration object
+ * @param jobConfig FROM job configuration object
+ */
+ @Override
+ public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
+ FromJobConfiguration jobConfig) {
+ // do nothing at this point
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
new file mode 100644
index 0000000..0752510
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+
+
+public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJobConfiguration> {
+ /**
+ * Initialize new submission based on given configuration properties. Any
+ * needed temporary values might be saved to context object and they will be
+ * promoted to all other part of the workflow automatically.
+ *
+ * @param context Initializer context object
+ * @param linkConfig link configuration object
+ * @param jobConfig FROM job configuration object
+ */
+ @Override
+ public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
+ FromJobConfiguration jobConfig) {
+ // do nothing at this point
+ }
+
+ @Override
+ public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
+ FromJobConfiguration jobConfig) {
+ return new Schema("HDFS file");
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
deleted file mode 100644
index bb5e353..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs;
-
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.schema.Schema;
-
-
-public class HdfsInitializer extends Initializer {
- /**
- * Initialize new submission based on given configuration properties. Any
- * needed temporary values might be saved to context object and they will be
- * promoted to all other part of the workflow automatically.
- *
- * @param context Initializer context object
- * @param linkConfig Connector's link configuration object
- * @param jobConf Connector's job configuration object
- */
- @Override
- public void initialize(InitializerContext context, Object linkConfig, Object jobConf) {
-
- }
-
- @Override
- public Schema getSchema(InitializerContext context, Object linkConfig, Object jobConfig) {
- return new Schema("HDFS file");
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 660418d..682349c 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -17,13 +17,16 @@
*/
package org.apache.sqoop.connector.hdfs;
+import java.io.IOException;
+import java.util.UUID;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
@@ -34,10 +37,7 @@ import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.utils.ClassUtils;
-import java.io.IOException;
-import java.util.UUID;
-
-public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+public class HdfsLoader extends Loader<EmptyConfiguration, ToJobConfiguration> {
/**
* Load data to target.
*
@@ -47,7 +47,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
* @throws Exception
*/
@Override
- public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
+ public void load(LoaderContext context, EmptyConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
DataReader reader = context.getDataReader();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index f40459f..daa7fe2 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -21,36 +21,36 @@ package org.apache.sqoop.connector.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
-import org.apache.sqoop.common.PrefixContext;
/**
* This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
* org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
*/
-public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {
+public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConfiguration> {
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
@@ -68,7 +68,7 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
@Override
public List<Partition> getPartitions(PartitionerContext context,
- LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
+ EmptyConfiguration emptyConfig, FromJobConfiguration fromJobConfig) {
Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
new file mode 100644
index 0000000..8bfd727
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfiguration> {
+ /**
+ * Callback to clean up after job execution.
+ *
+ * @param context Destroyer context
+ * @param linkConfig link configuration object
+ * @param jobConfig TO job configuration object
+ */
+ @Override
+ public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
+ ToJobConfiguration jobConfig) {
+ // do nothing at this point
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
new file mode 100644
index 0000000..e3d54b8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+
+
+public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConfiguration> {
+ /**
+ * Initialize new submission based on given configuration properties. Any
+ * needed temporary values might be saved to context object and they will be
+ * promoted to all other part of the workflow automatically.
+ *
+ * @param context Initializer context object
+ * @param linkConfig link configuration object
+ * @param jobConfig TO job configuration object
+ */
+ @Override
+ public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
+ ToJobConfiguration jobConfig) {
+ // do nothing at this point
+ }
+
+ @Override
+ public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
+ ToJobConfiguration jobConfig) {
+ return new Schema("HDFS file");
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
deleted file mode 100644
index 5d48a29..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.configuration;
-
-import org.apache.sqoop.model.ConfigClass;
-import org.apache.sqoop.model.Input;
-
-@ConfigClass
-public class LinkConfig {
- //Todo: Didn't find anything that belongs here...
- // Since empty forms don't work (DERBYREPO_0008:The config contains no input metadata), I'm putting a dummy config here
-
- @Input(size = 255) public String dummy;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
deleted file mode 100644
index c0cd336..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Config;
-
-@ConfigurationClass
-public class LinkConfiguration {
- @Config
- public LinkConfig linkConfig;
-
- public LinkConfiguration() {
- linkConfig = new LinkConfig();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 9b8c6ba..90bc8bc 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -16,14 +16,6 @@
# Generic HDFS Connector Resources
############################
-# Link Config
-#
-linkConfig.label = Link configuration
-linkConfig.help = You must supply the information requested in order to \
- create a connection object.
-
-linkConfig.dummy.label = Dummy parameter needed to get HDFS connector to register
-linkConfig.dummy.help = You can write anything here. Doesn't matter.
# To Job Config
#
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index c6d2f90..124c3df 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -17,12 +17,20 @@
*/
package org.apache.sqoop.connector.hdfs;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.etl.io.DataWriter;
@@ -35,14 +43,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
-
@RunWith(Parameterized.class)
public class TestExtractor extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
@@ -52,7 +52,7 @@ public class TestExtractor extends TestHdfsBase {
private ToFormat outputFileType;
private Class<? extends CompressionCodec> compressionClass;
private final String inputDirectory;
- private Extractor extractor;
+ private Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> extractor;
public TestExtractor(ToFormat outputFileType,
Class<? extends CompressionCodec> compressionClass)
@@ -131,13 +131,11 @@ public class TestExtractor extends TestHdfsBase {
}
});
- LinkConfiguration connConf = new LinkConfiguration();
-
- FromJobConfiguration jobConf = new FromJobConfiguration();
-
+ EmptyConfiguration emptyLinkConfig = new EmptyConfiguration();
+ FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
- extractor.extract(context, connConf, jobConf, partition);
+ extractor.extract(context, emptyLinkConfig, emptyJobConfig, partition);
for (int index = 0; index < NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE; ++index) {
Assert.assertTrue("Index " + (index + 1) + " was not visited", visited[index]);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index 552a751..8429e15 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -17,6 +17,16 @@
*/
package org.apache.sqoop.connector.hdfs;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -27,7 +37,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -41,16 +51,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
-
@RunWith(Parameterized.class)
public class TestLoader extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
@@ -121,25 +121,25 @@ public class TestLoader extends TestHdfsBase {
return null;
}
}, null);
- LinkConfiguration connConf = new LinkConfiguration();
+ EmptyConfiguration linkConf = new EmptyConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
jobConf.toJobConfig.outputDirectory = outputDirectory;
jobConf.toJobConfig.compression = compression;
jobConf.toJobConfig.outputFormat = outputFormat;
Path outputPath = new Path(outputDirectory);
- loader.load(context, connConf, jobConf);
+ loader.load(context, linkConf, jobConf);
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
verifyOutput(fs, status.getPath());
}
- loader.load(context, connConf, jobConf);
+ loader.load(context, linkConf, jobConf);
Assert.assertEquals(2, fs.listStatus(outputPath).length);
- loader.load(context, connConf, jobConf);
- loader.load(context, connConf, jobConf);
- loader.load(context, connConf, jobConf);
+ loader.load(context, linkConf, jobConf);
+ loader.load(context, linkConf, jobConf);
+ loader.load(context, linkConf, jobConf);
Assert.assertEquals(5, fs.listStatus(outputPath).length);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
index 9d177ec..bef1984 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
@@ -17,12 +17,21 @@
*/
package org.apache.sqoop.connector.hdfs;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.job.etl.Partition;
@@ -30,17 +39,9 @@ import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.*;
-import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestPartitioner extends TestHdfsBase {
@@ -97,12 +98,12 @@ public class TestPartitioner extends TestHdfsBase {
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
- LinkConfiguration connConf = new LinkConfiguration();
+ EmptyConfiguration linkConf = new EmptyConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
jobConf.fromJobConfig.inputDirectory = inputDirectory;
- List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+ List<Partition> partitions = partitioner.getPartitions(context, linkConf, jobConf);
if (this.compressionClass == null) {
assertEquals(5, partitions.size());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java
new file mode 100644
index 0000000..60b9e93
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.sqoop.model.ConfigurationClass;
+
+/**
+ * Marker empty link configuration class with empty config
+ *
+**/
+@ConfigurationClass
+public class EmptyConfiguration {
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 51e562c..e83002d 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -350,25 +350,25 @@ public class JobManager implements Reconfigurable {
jobRequest.setJobName(job.getName());
jobRequest.setJobId(job.getPersistenceId());
jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
- Class<? extends IntermediateDataFormat<?>> dataFormatClass =
- fromConnector.getIntermediateDataFormat();
+ Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector
+ .getIntermediateDataFormat();
jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
-
jobRequest.setFrom(fromConnector.getFrom());
jobRequest.setTo(toConnector.getTo());
+ // set all the jars
addStandardJars(jobRequest);
addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
addConnectorInitializerJars(jobRequest, Direction.FROM);
addConnectorInitializerJars(jobRequest, Direction.TO);
- Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
- Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
-
+ // call the intialize method
+ initializeConnector(jobRequest, Direction.FROM);
+ initializeConnector(jobRequest, Direction.TO);
- jobRequest.getSummary().setFromSchema(fromSchema);
- jobRequest.getSummary().setToSchema(toSchema);
+ jobRequest.getSummary().setFromSchema(getSchemaForConnector(jobRequest, Direction.FROM));
+ jobRequest.getSummary().setToSchema(getSchemaForConnector(jobRequest, Direction.TO));
LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
return jobRequest;
@@ -435,21 +435,22 @@ public class JobManager implements Reconfigurable {
}
return job;
}
-
- private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
+ private void initializeConnector(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction);
-
- // Initializer context
InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
// Initialize submission from the connector perspective
initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
jobRequest.getJobConfig(direction));
+ }
+ private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) {
+
+ Initializer initializer = getConnectorInitializer(jobRequest, direction);
+ InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
- return initializer.getSchema(initializerContext,
- jobRequest.getConnectorLinkConfig(direction),
+ return initializer.getSchema(initializerContext, jobRequest.getConnectorLinkConfig(direction),
jobRequest.getJobConfig(direction));
}
@@ -459,8 +460,7 @@ public class JobManager implements Reconfigurable {
InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
// Add job specific jars to
jobRequest.addJars(initializer.getJars(initializerContext,
- jobRequest.getConnectorLinkConfig(direction),
- jobRequest.getJobConfig(direction)));
+ jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)));
}
private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
@@ -698,4 +698,4 @@ public class JobManager implements Reconfigurable {
LOG.info("Ending submission manager update thread");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 6d0dcb4..78ae4ec 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -17,6 +17,8 @@
*/
package org.apache.sqoop.job;
+import static org.junit.Assert.assertEquals;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -33,6 +35,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
@@ -57,8 +60,6 @@ import org.apache.sqoop.schema.type.Text;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class TestMapReduce {
private static final int START_PARTITION = 1;
@@ -170,9 +171,9 @@ public class TestMapReduce {
}
}
- public static class DummyExtractor extends Extractor {
+ public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
@Override
- public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
+ public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
context.getDataWriter().writeArrayRecord(new Object[] {
@@ -250,12 +251,12 @@ public class TestMapReduce {
}
}
- public static class DummyLoader extends Loader {
+ public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data expected = new Data();
@Override
- public void load(LoaderContext context, Object oc, Object oj) throws Exception{
+ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{
String data;
while ((data = context.getDataReader().readTextRecord()) != null) {
expected.setContent(new Object[] {
@@ -269,22 +270,22 @@ public class TestMapReduce {
}
}
- public static class DummyFromDestroyer extends Destroyer {
+ public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
public static int count = 0;
@Override
- public void destroy(DestroyerContext context, Object o, Object o2) {
+ public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
count++;
}
}
- public static class DummyToDestroyer extends Destroyer {
+ public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> {
public static int count = 0;
@Override
- public void destroy(DestroyerContext context, Object o, Object o2) {
+ public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
count++;
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 665a65b..04fb692 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -17,6 +17,8 @@
*/
package org.apache.sqoop.job;
+import static org.junit.Assert.assertEquals;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -34,6 +36,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
@@ -53,8 +56,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.junit.Assert.assertEquals;
-
@RunWith(Parameterized.class)
public class TestMatching {
@@ -193,9 +194,9 @@ public class TestMatching {
}
}
- public static class DummyExtractor extends Extractor {
+ public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, Partition> {
@Override
- public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
+ public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, Partition partition) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
context.getDataWriter().writeArrayRecord(new Object[] {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index a133106..e2d98ca 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -28,7 +28,9 @@ public abstract class Destroyer<LinkConfiguration, JobConfiguration> {
*
* @param context Destroyer context
* @param linkConfiguration link configuration object
- * @param jobConfiguration job configuration object
+ * @param jobConfiguration job configuration object for the FROM and TO
+ * In case of the FROM initializer this will represent the FROM job configuration
+ * In case of the TO initializer this will represent the TO job configuration
*/
public abstract void destroy(DestroyerContext context,
LinkConfiguration linkConfiguration,
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index d6c186d..85e91ef 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -21,20 +21,20 @@ package org.apache.sqoop.job.etl;
* This allows connector to extract data from a source system
* based on each partition.
*/
-public abstract class Extractor<LinkConfiguration, JobConfiguration, Partition> {
+public abstract class Extractor<LinkConfiguration, FromJobConfiguration, SqoopPartition> {
/**
* Extract data from source and pass them into the Sqoop.
*
* @param context Extractor context object
* @param linkConfiguration link configuration object
- * @param jobConfiguration job configuration object
- * @param partition Partition that this extract should work on
+ * @param jobConfiguration FROM job configuration object
+ * @param partition Partition that this extracter should work on
*/
public abstract void extract(ExtractorContext context,
LinkConfiguration linkConfiguration,
- JobConfiguration jobConfiguration,
- Partition partition);
+ FromJobConfiguration jobConfiguration,
+ SqoopPartition partition);
/**
* Return the number of rows read by the last call to
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 5c48fc3..d66b099 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -35,27 +35,39 @@ public abstract class Initializer<LinkConfiguration, JobConfiguration> {
*
* @param context Initializer context object
* @param linkConfiguration link configuration object
- * @param jobConfiguration job configuration object
+ * @param jobConfiguration job configuration object for the FROM and TO
+ * In case of the FROM initializer this will represent the FROM job configuration
+ * In case of the TO initializer this will represent the TO job configuration
*/
- public abstract void initialize(InitializerContext context,
- LinkConfiguration linkConfiguration,
- JobConfiguration jobConfiguration);
-
- /**
- * Return list of all jars that this particular connector needs to operate
- * on following job. This method will be called after running initialize
- * method.
- *
- * @return
- */
- public List<String> getJars(InitializerContext context,
- LinkConfiguration linkConfiguration,
- JobConfiguration jobConfiguration) {
- return new LinkedList<String>();
- }
-
- public abstract Schema getSchema(InitializerContext context,
- LinkConfiguration linkConfiguration,
- JobConfiguration jobConfiguration);
+ public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,
+ JobConfiguration jobConfiguration);
+
+ /**
+ * Return list of all jars that this particular connector needs to operate on
+ * following job. This method will be called after running initialize method.
+ * @param context Initializer context object
+ * @param linkConfiguration link configuration object
+ * @param jobConfiguration job configuration object for the FROM and TO
+ * In case of the FROM initializer this will represent the FROM job configuration
+ * In case of the TO initializer this will represent the TO job configuration
+ * @return
+ */
+ public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,
+ JobConfiguration jobConfiguration) {
+ return new LinkedList<String>();
+ }
+
+ /**
+ * Return schema associated with the connector for FROM and TO
+ * @param context Initializer context object
+ * @param linkConfiguration link configuration object
+ * @param jobConfiguration job configuration object for the FROM and TO
+ * In case of the FROM initializer this will represent the FROM job configuration
+ * In case of the TO initializer this will represent the TO job configuration
+ * @return
+ */
+
+ public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,
+ JobConfiguration jobConfiguration);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index cc32ada..3b6bd71 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -20,18 +20,17 @@ package org.apache.sqoop.job.etl;
/**
* This allows connector to load data into a target system.
*/
-public abstract class Loader<LinkConfiguration, JobConfiguration> {
+public abstract class Loader<LinkConfiguration, ToJobConfiguration> {
/**
* Load data to target.
*
* @param context Loader context object
* @param linkConfiguration link configuration object
- * @param jobConfiguration job configuration object
+ * @param jobConfiguration TO job configuration object
* @throws Exception
*/
- public abstract void load(LoaderContext context,
- LinkConfiguration linkConfiguration,
- JobConfiguration jobConfiguration) throws Exception;
+ public abstract void load(LoaderContext context, LinkConfiguration linkConfiguration,
+ ToJobConfiguration jobConfiguration) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
index 57507df..3636130 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
@@ -20,10 +20,10 @@ package org.apache.sqoop.job.etl;
import java.util.List;
/**
- * This allows connector to define how input data to be partitioned.
+ * This allows connector to define how input data from the FROM source can be partitioned.
* The number of data partitions also determines the degree of parallelism.
*/
-public abstract class Partitioner<LinkConfiguration, JobConfiguration> {
+public abstract class Partitioner<LinkConfiguration, FromJobConfiguration> {
/**
* Partition input data into partitions.
@@ -35,8 +35,6 @@ public abstract class Partitioner<LinkConfiguration, JobConfiguration> {
* @param jobConfiguration job configuration object
* @return
*/
- public abstract List<Partition> getPartitions(PartitionerContext context,
- LinkConfiguration linkConfiguration,
- JobConfiguration jobConfiguration);
-
+ public abstract List<Partition> getPartitions(PartitionerContext context,
+ LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration);
}