You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 04:51:33 UTC
[04/52] [abbrv] git commit: SQOOP-1487: Sqoop2: From/To:
Refactor/Create HDFS connector test cases
SQOOP-1487: Sqoop2: From/To: Refactor/Create HDFS connector test cases
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c0b22b1d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c0b22b1d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c0b22b1d
Branch: refs/heads/SQOOP-1367
Commit: c0b22b1d62221f2a24294520b8ff3851c06d06c3
Parents: 8fee134
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Sep 3 09:40:11 2014 +0200
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:58:17 2014 -0700
----------------------------------------------------------------------
connector/connector-hdfs/pom.xml | 5 +
.../sqoop/connector/hdfs/HdfsExtractor.java | 2 -
.../apache/sqoop/connector/hdfs/FileUtils.java | 82 ++++++
.../sqoop/connector/hdfs/TestExtractor.java | 125 ++++++++++
.../sqoop/connector/hdfs/TestHdfsBase.java | 139 +++++++++++
.../apache/sqoop/connector/hdfs/TestLoader.java | 213 ++++++++++++++++
.../sqoop/connector/hdfs/TestPartitioner.java | 113 +++++++++
.../src/test/resources/log4j.properties | 24 ++
.../java/org/apache/sqoop/job/FileUtils.java | 69 -----
.../org/apache/sqoop/job/TestHdfsExtract.java | 241 ------------------
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 250 -------------------
11 files changed, 701 insertions(+), 562 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 8df9f11..fa4330a 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -35,6 +35,11 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index fc12381..7447071 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -42,8 +42,6 @@ import java.io.IOException;
* Extract from HDFS.
* Default field delimiter of a record is comma.
*/
-
-
public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> {
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java
new file mode 100644
index 0000000..8c19d01
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FileUtils {
+
+ public static boolean exists(String file) throws IOException {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ return fs.exists(path);
+ }
+
+ public static void delete(String file) throws IOException {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+
+ public static void mkdirs(String directory) throws IOException {
+ Path path = new Path(directory);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ if (!fs.exists(path)) {
+ fs.mkdirs(path);
+ }
+ }
+
+ public static InputStream open(String fileName)
+ throws IOException, ClassNotFoundException {
+ Path filepath = new Path(fileName);
+ FileSystem fs = filepath.getFileSystem(new Configuration());
+ return fs.open(filepath);
+ }
+
+ public static OutputStream create(String fileName) throws IOException {
+ Path filepath = new Path(fileName);
+ FileSystem fs = filepath.getFileSystem(new Configuration());
+ return fs.create(filepath, false);
+ }
+
+ public static Path[] listDir(String directory) throws IOException {
+ Path dirpath = new Path(directory);
+ FileSystem fs = dirpath.getFileSystem(new Configuration());
+ List<Path> paths = new LinkedList<Path>();
+ for (FileStatus fileStatus : fs.listStatus(dirpath)) {
+ paths.add(fileStatus.getPath());
+ }
+ return paths.toArray(new Path[paths.size()]);
+ }
+
+ private FileUtils() {
+ // Disable explicit object creation
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
new file mode 100644
index 0000000..6ed4087
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE;
+
+@RunWith(Parameterized.class)
+public class TestExtractor extends TestHdfsBase {
+ private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
+ private static final int NUMBER_OF_FILES = 5;
+ private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+ private OutputFormat outputFileType;
+ private Class<? extends CompressionCodec> compressionClass;
+ private final String inputDirectory;
+ private Extractor extractor;
+
+ public TestExtractor(OutputFormat outputFileType,
+ Class<? extends CompressionCodec> compressionClass)
+ throws Exception {
+ this.inputDirectory = INPUT_ROOT + getClass().getSimpleName();
+ this.outputFileType = outputFileType;
+ this.compressionClass = compressionClass;
+ this.extractor = new HdfsExtractor();
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> parameters = new ArrayList<Object[]>();
+ for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
+ for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+ parameters.add(new Object[]{outputFileType, compressionClass});
+ }
+ }
+ return parameters;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ FileUtils.mkdirs(inputDirectory);
+ switch (this.outputFileType) {
+ case TEXT_FILE:
+ createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+ break;
+
+ case SEQUENCE_FILE:
+ createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+ break;
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.delete(inputDirectory);
+ }
+
+ @Test
+ public void testExtractor() throws Exception {
+ Configuration conf = new Configuration();
+ PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
+ ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
+ private long index = 1L;
+
+ @Override
+ public void writeArrayRecord(Object[] array) {
+ throw new AssertionError("Should not be writing array.");
+ }
+
+ @Override
+ public void writeStringRecord(String text) {
+ Assert.assertEquals(index + "," + index + ".0,'" + index++ + "'", text);
+ }
+
+ @Override
+ public void writeRecord(Object obj) {
+ throw new AssertionError("Should not be writing object.");
+ }
+ }, null);
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ FromJobConfiguration jobConf = new FromJobConfiguration();
+
+ HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
+
+ extractor.extract(context, connConf, jobConf, partition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
new file mode 100644
index 0000000..0cc2b8b
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+public class TestHdfsBase {
+
+ protected HdfsPartition createPartition(Path[] paths) throws IOException {
+ long[] offsets = new long[paths.length];
+ long[] lengths = new long[paths.length];
+ String[] locations = new String[paths.length];
+ FileSystem fs = FileSystem.get(new Configuration());
+
+ for (int i = 0; i < offsets.length; ++i) {
+ locations[i] = paths[i].getName();
+ lengths[i] = fs.getFileStatus(paths[i]).getLen();
+ }
+
+ return new HdfsPartition(paths, offsets, lengths, locations);
+ }
+
+ protected void createTextInput(String indir,
+ Class<? extends CompressionCodec> clz,
+ int numberOfFiles,
+ int numberOfRows)
+ throws IOException, InstantiationException, IllegalAccessException {
+ Configuration conf = new Configuration();
+
+ CompressionCodec codec = null;
+ String extension = "";
+ if (clz != null) {
+ codec = clz.newInstance();
+ if (codec instanceof Configurable) {
+ ((Configurable) codec).setConf(conf);
+ }
+ extension = codec.getDefaultExtension();
+ }
+
+ int index = 1;
+ for (int fi = 0; fi < numberOfFiles; fi++) {
+ String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
+ OutputStream filestream = FileUtils.create(fileName);
+ BufferedWriter filewriter;
+ if (codec != null) {
+ filewriter = new BufferedWriter(new OutputStreamWriter(
+ codec.createOutputStream(filestream, codec.createCompressor()),
+ "UTF-8"));
+ } else {
+ filewriter = new BufferedWriter(new OutputStreamWriter(
+ filestream, "UTF-8"));
+ }
+
+ for (int ri = 0; ri < numberOfRows; ri++) {
+ String row = index + "," + (double)index + ",'" + index + "'";
+ filewriter.write(row + HdfsConstants.DEFAULT_RECORD_DELIMITER);
+ index++;
+ }
+
+ filewriter.close();
+ }
+ }
+
+ protected void createSequenceInput(String indir,
+ Class<? extends CompressionCodec> clz,
+ int numberOfFiles,
+ int numberOfRows)
+ throws IOException, InstantiationException, IllegalAccessException {
+ Configuration conf = new Configuration();
+
+ CompressionCodec codec = null;
+ if (clz != null) {
+ codec = clz.newInstance();
+ if (codec instanceof Configurable) {
+ ((Configurable) codec).setConf(conf);
+ }
+ }
+
+ int index = 1;
+ for (int fi = 0; fi < numberOfFiles; fi++) {
+ Path filepath = new Path(indir,
+ "part-r-" + padZeros(fi, 5) + ".seq");
+ SequenceFile.Writer filewriter;
+ if (codec != null) {
+ filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+ conf, filepath, Text.class, NullWritable.class,
+ SequenceFile.CompressionType.BLOCK, codec);
+ } else {
+ filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+ conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE);
+ }
+
+ Text text = new Text();
+ for (int ri = 0; ri < numberOfRows; ri++) {
+ String row = index + "," + (double)index + ",'" + index + "'";
+ text.set(row);
+ filewriter.append(text, NullWritable.get());
+ index++;
+ }
+
+ filewriter.close();
+ }
+ }
+
+ private String padZeros(int number, int digits) {
+ String string = String.valueOf(number);
+ for (int i = (digits - string.length()); i > 0; i--) {
+ string = "0" + string;
+ }
+ return string;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
new file mode 100644
index 0000000..79cf1f1
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputCompression;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE;
+
+@RunWith(Parameterized.class)
+public class TestLoader extends TestHdfsBase {
+ private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
+ private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+ private OutputFormat outputFormat;
+ private OutputCompression compression;
+ private final String outputDirectory;
+ private Loader loader;
+
+ public TestLoader(OutputFormat outputFormat,
+ OutputCompression compression)
+ throws Exception {
+ this.outputDirectory = INPUT_ROOT + getClass().getSimpleName();
+ this.outputFormat = outputFormat;
+ this.compression = compression;
+ this.loader = new HdfsLoader();
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> parameters = new ArrayList<Object[]>();
+ for (OutputCompression compression : new OutputCompression[]{
+ OutputCompression.DEFAULT,
+ OutputCompression.BZIP2,
+ OutputCompression.NONE
+ }) {
+ for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+ parameters.add(new Object[]{outputFileType, compression});
+ }
+ }
+ return parameters;
+ }
+
+ @Before
+ public void setUp() throws Exception {}
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.delete(outputDirectory);
+ }
+
+ @Test
+ public void testLoader() throws Exception {
+ FileSystem fs = FileSystem.get(new Configuration());
+
+ Configuration conf = new Configuration();
+ PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
+ LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
+ private long index = 0L;
+
+ @Override
+ public Object[] readArrayRecord() {
+ return null;
+ }
+
+ @Override
+ public String readTextRecord() {
+ if (index++ < NUMBER_OF_ROWS_PER_FILE) {
+ return index + "," + (double)index + ",'" + index + "'";
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Object readContent() {
+ return null;
+ }
+ }, null);
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ToJobConfiguration jobConf = new ToJobConfiguration();
+ jobConf.output.outputDirectory = outputDirectory;
+ jobConf.output.compression = compression;
+ jobConf.output.outputFormat = outputFormat;
+ Path outputPath = new Path(outputDirectory);
+
+ loader.load(context, connConf, jobConf);
+ Assert.assertEquals(1, fs.listStatus(outputPath).length);
+
+ for (FileStatus status : fs.listStatus(outputPath)) {
+ verifyOutput(fs, status.getPath());
+ }
+
+ loader.load(context, connConf, jobConf);
+ Assert.assertEquals(2, fs.listStatus(outputPath).length);
+ loader.load(context, connConf, jobConf);
+ loader.load(context, connConf, jobConf);
+ loader.load(context, connConf, jobConf);
+ Assert.assertEquals(5, fs.listStatus(outputPath).length);
+ }
+
+ private void verifyOutput(FileSystem fs, Path file) throws IOException {
+ Configuration conf = new Configuration();
+ FSDataInputStream fsin = fs.open(file);
+ CompressionCodec codec;
+
+ switch(outputFormat) {
+ case TEXT_FILE:
+ codec = (new CompressionCodecFactory(conf)).getCodec(file);
+
+ // Verify compression
+ switch(compression) {
+ case BZIP2:
+ Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
+ break;
+
+ case DEFAULT:
+ Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Deflate") != -1);
+ break;
+
+ case NONE:
+ default:
+ Assert.assertNull(codec);
+ break;
+ }
+
+ InputStreamReader in;
+ if (codec == null) {
+ in = new InputStreamReader(fsin);
+ } else {
+ in = new InputStreamReader(codec.createInputStream(fsin, codec.createDecompressor()));
+ }
+ BufferedReader textReader = new BufferedReader(in);
+
+ for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) {
+ Assert.assertEquals(i + "," + (double)i + ",'" + i + "'", textReader.readLine());
+ }
+ break;
+
+ case SEQUENCE_FILE:
+ SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, file, conf);
+ codec = sequenceReader.getCompressionCodec();
+
+ // Verify compression
+ switch(compression) {
+ case BZIP2:
+ Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
+ break;
+
+ case DEFAULT:
+ Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1);
+ break;
+
+ case NONE:
+ default:
+ Assert.assertNull(codec);
+ break;
+ }
+
+ Text line = new Text();
+ int index = 1;
+ while (sequenceReader.next(line)) {
+ Assert.assertEquals(index + "," + (double)index + ",'" + index++ + "'", line.toString());
+ line = new Text();
+ }
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
new file mode 100644
index 0000000..ae93b0a
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.*;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestPartitioner extends TestHdfsBase {
+ private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
+ private static final int NUMBER_OF_FILES = 5;
+ private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+ private OutputFormat outputFileType;
+ private Class<? extends CompressionCodec> compressionClass;
+ private Partitioner partitioner;
+
+ private final String inputDirectory;
+
+ public TestPartitioner(OutputFormat outputFileType, Class<? extends CompressionCodec> compressionClass) {
+ this.inputDirectory = INPUT_ROOT + getClass().getSimpleName();
+ this.outputFileType = outputFileType;
+ this.compressionClass = compressionClass;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ partitioner = new HdfsPartitioner();
+ FileUtils.mkdirs(inputDirectory);
+
+ switch (this.outputFileType) {
+ case TEXT_FILE:
+ createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+ break;
+
+ case SEQUENCE_FILE:
+ createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+ break;
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.delete(inputDirectory);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> parameters = new ArrayList<Object[]>();
+ for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
+ for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+ parameters.add(new Object[]{outputFileType, compressionClass});
+ }
+ }
+ return parameters;
+ }
+
+ @Test
+ public void testPartitioner() {
+ Configuration conf = new Configuration();
+ PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
+ PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ FromJobConfiguration jobConf = new FromJobConfiguration();
+
+ jobConf.input.inputDirectory = inputDirectory;
+
+ List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+
+ if (this.compressionClass == null) {
+ assertEquals(5, partitions.size());
+ } else {
+ assertEquals(3, partitions.size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/resources/log4j.properties b/connector/connector-hdfs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..44ffced
--- /dev/null
+++ b/connector/connector-hdfs/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
deleted file mode 100644
index e685883..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class FileUtils {
-
- public static boolean exists(String file) throws IOException {
- Path path = new Path(file);
- FileSystem fs = path.getFileSystem(new Configuration());
- return fs.exists(path);
- }
-
- public static void delete(String file) throws IOException {
- Path path = new Path(file);
- FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
-
- public static void mkdirs(String directory) throws IOException {
- Path path = new Path(directory);
- FileSystem fs = path.getFileSystem(new Configuration());
- if (!fs.exists(path)) {
- fs.mkdirs(path);
- }
- }
-
- public static InputStream open(String fileName)
- throws IOException, ClassNotFoundException {
- Path filepath = new Path(fileName);
- FileSystem fs = filepath.getFileSystem(new Configuration());
- return fs.open(filepath);
- }
-
- public static OutputStream create(String fileName) throws IOException {
- Path filepath = new Path(fileName);
- FileSystem fs = filepath.getFileSystem(new Configuration());
- return fs.create(filepath, false);
- }
-
- private FileUtils() {
- // Disable explicit object creation
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
deleted file mode 100644
index 2accf77..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import junit.framework.TestCase;
-
-//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-//import org.apache.sqoop.job.etl.HdfsExportExtractor;
-//import org.apache.sqoop.job.etl.HdfsExportPartitioner;
-
-public class TestHdfsExtract extends TestCase {
-
-// private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
-// private static final int NUMBER_OF_FILES = 5;
-// private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
-//
-// private final String indir;
-//
-// public TestHdfsExtract() {
-// indir = INPUT_ROOT + getClass().getSimpleName();
-// }
-//
-// @Override
-// public void setUp() throws IOException {
-// FileUtils.mkdirs(indir);
-// }
-//
-// @Override
-// public void tearDown() throws IOException {
-// FileUtils.delete(indir);
-// }
-//
-// /**
-// * Test case for validating the number of partitions creation
-// * based on input.
-// * Success if the partitions list size is less or equal to
-// * given max partition.
-// * @throws Exception
-// */
-// @Test
-// public void testHdfsExportPartitioner() throws Exception {
-// createTextInput(null);
-// Configuration conf = new Configuration();
-// conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-//
-// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
-// PrefixContext prefixContext = new PrefixContext(conf, "");
-// int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
-//
-// for(int maxPartitions : partitionValues) {
-// PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions, null);
-// List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
-// assertTrue(partitionList.size()<=maxPartitions);
-// }
-// }
-//
-// @Test
-// public void testUncompressedText() throws Exception {
-// createTextInput(null);
-//
-// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-// }
-//
-// @Test
-// public void testDefaultCompressedText() throws Exception {
-// createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
-//
-// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-// }
-//
-// @Test
-// public void testBZip2CompressedText() throws Exception {
-// createTextInput(BZip2Codec.class);
-//
-// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-// }
-//
-// @Test
-// public void testDefaultCompressedSequence() throws Exception {
-// createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
-//
-// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-// }
-//
-// @Test
-// public void testUncompressedSequence() throws Exception {
-// createSequenceInput(null);
-//
-// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-// }
-//
-// private Schema createSchema() {
-// Schema schema = new Schema("Test");
-// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-// return schema;
-// }
-//
-// private Configuration createConf() {
-// Configuration conf = new Configuration();
-// ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-// conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER,
-// HdfsExportPartitioner.class.getName());
-// conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR,
-// HdfsExportExtractor.class.getName());
-// conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-// conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-// conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir);
-// return conf;
-// }
-//
-// private Job createJob(Configuration conf, Schema schema) throws Exception {
-// Job job = new Job(conf);
-// ConfigurationUtils.setConnectorSchema(job, schema);
-// job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// return job;
-// }
-//
-// private void createTextInput(Class<? extends CompressionCodec> clz)
-// throws IOException, InstantiationException, IllegalAccessException {
-// Configuration conf = new Configuration();
-//
-// CompressionCodec codec = null;
-// String extension = "";
-// if (clz != null) {
-// codec = clz.newInstance();
-// if (codec instanceof Configurable) {
-// ((Configurable) codec).setConf(conf);
-// }
-// extension = codec.getDefaultExtension();
-// }
-//
-// int index = 1;
-// for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
-// String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
-// OutputStream filestream = FileUtils.create(fileName);
-// BufferedWriter filewriter;
-// if (codec != null) {
-// filewriter = new BufferedWriter(new OutputStreamWriter(
-// codec.createOutputStream(filestream, codec.createCompressor()),
-// Data.CHARSET_NAME));
-// } else {
-// filewriter = new BufferedWriter(new OutputStreamWriter(
-// filestream, Data.CHARSET_NAME));
-// }
-//
-// for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
-// String row = index + "," + (double)index + ",'" + index + "'";
-// filewriter.write(row + Data.DEFAULT_RECORD_DELIMITER);
-// index++;
-// }
-//
-// filewriter.close();
-// }
-// }
-//
-// private void createSequenceInput(Class<? extends CompressionCodec> clz)
-// throws IOException, InstantiationException, IllegalAccessException {
-// Configuration conf = new Configuration();
-//
-// CompressionCodec codec = null;
-// if (clz != null) {
-// codec = clz.newInstance();
-// if (codec instanceof Configurable) {
-// ((Configurable) codec).setConf(conf);
-// }
-// }
-//
-// int index = 1;
-// for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
-// Path filepath = new Path(indir,
-// "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
-// SequenceFile.Writer filewriter;
-// if (codec != null) {
-// filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-// conf, filepath, Text.class, NullWritable.class,
-// CompressionType.BLOCK, codec);
-// } else {
-// filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-// conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
-// }
-//
-// Text text = new Text();
-// for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
-// String row = index + "," + (double)index + ",'" + index + "'";
-// text.set(row);
-// filewriter.append(text, NullWritable.get());
-// index++;
-// }
-//
-// filewriter.close();
-// }
-// }
-//
-// private String padZeros(int number, int digits) {
-// String string = String.valueOf(number);
-// for (int i=(digits-string.length()); i>0; i--) {
-// string = "0" + string;
-// }
-// return string;
-// }
-//
-// public static class DummyLoader extends Loader {
-// @Override
-// public void load(LoaderContext context, Object oc, Object oj) throws Exception {
-// int index = 1;
-// int sum = 0;
-// Object[] array;
-// while ((array = context.getDataReader().readArrayRecord()) != null) {
-// sum += Integer.valueOf(array[0].toString());
-// index++;
-// };
-//
-// int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
-// assertEquals((1+numbers)*numbers/2, sum);
-//
-// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
-// }
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
deleted file mode 100644
index 8eba049..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import junit.framework.TestCase;
-
-//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-//import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-
-public class TestHdfsLoad extends TestCase {
-
-// private static final String OUTPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
-// private static final String OUTPUT_FILE = "part-r-00000";
-// private static final int START_ID = 1;
-// private static final int NUMBER_OF_IDS = 9;
-// private static final int NUMBER_OF_ROWS_PER_ID = 10;
-//
-// private String outdir;
-//
-// public TestHdfsLoad() {
-// outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
-// }
-//
-// public void testUncompressedText() throws Exception {
-// FileUtils.delete(outdir);
-//
-// Configuration conf = new Configuration();
-// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-// conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-// Schema schema = new Schema("Test");
-// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-// Job job = new Job(conf);
-// ConfigurationUtils.setConnectorSchema(job, schema);
-// JobUtils.runJob(job.getConfiguration());
-//
-// String fileName = outdir + "/" + OUTPUT_FILE;
-// InputStream filestream = FileUtils.open(fileName);
-// BufferedReader filereader = new BufferedReader(new InputStreamReader(
-// filestream, Charsets.UTF_8));
-// verifyOutputText(filereader);
-// }
-//
-// public void testCompressedText() throws Exception {
-// FileUtils.delete(outdir);
-//
-// Configuration conf = new Configuration();
-// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-// conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-// conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-//
-// Schema schema = new Schema("Test");
-// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-// Job job = new Job(conf);
-// ConfigurationUtils.setConnectorSchema(job, schema);
-// JobUtils.runJob(job.getConfiguration());
-//
-// Class<? extends CompressionCodec> codecClass = conf.getClass(
-// JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
-// .asSubclass(CompressionCodec.class);
-// CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
-// String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension();
-// InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
-// BufferedReader filereader = new BufferedReader(new InputStreamReader(
-// filestream, Charsets.UTF_8));
-// verifyOutputText(filereader);
-// }
-//
-// private void verifyOutputText(BufferedReader reader) throws IOException {
-// String actual = null;
-// String expected;
-// Data data = new Data();
-// int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-// while ((actual = reader.readLine()) != null){
-// data.setContent(new Object[] {
-// index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
-// Data.ARRAY_RECORD);
-// expected = data.toString();
-// index++;
-//
-// assertEquals(expected, actual);
-// }
-// reader.close();
-//
-// assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
-// index-START_ID*NUMBER_OF_ROWS_PER_ID);
-// }
-//
-// public void testUncompressedSequence() throws Exception {
-// FileUtils.delete(outdir);
-//
-// Configuration conf = new Configuration();
-// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-// conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-//
-// Schema schema = new Schema("Test");
-// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-// Job job = new Job(conf);
-// ConfigurationUtils.setConnectorSchema(job, schema);
-// JobUtils.runJob(job.getConfiguration());
-//
-// Path filepath = new Path(outdir,
-// OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
-// SequenceFile.Reader filereader = new SequenceFile.Reader(
-// filepath.getFileSystem(conf), filepath, conf);
-// verifyOutputSequence(filereader);
-// }
-//
-// public void testCompressedSequence() throws Exception {
-// FileUtils.delete(outdir);
-//
-// Configuration conf = new Configuration();
-// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-// conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-// CSVIntermediateDataFormat.class.getName());
-// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-// conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-//
-// Schema schema = new Schema("Test");
-// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-// Job job = new Job(conf);
-// ConfigurationUtils.setConnectorSchema(job, schema);
-// JobUtils.runJob(job.getConfiguration());
-// Path filepath = new Path(outdir,
-// OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
-// SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf);
-// verifyOutputSequence(filereader);
-// }
-//
-// private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
-// int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-// Text actual = new Text();
-// Text expected = new Text();
-// Data data = new Data();
-// while (reader.next(actual)){
-// data.setContent(new Object[] {
-// index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
-// Data.ARRAY_RECORD);
-// expected.set(data.toString());
-// index++;
-//
-// assertEquals(expected.toString(), actual.toString());
-// }
-// reader.close();
-//
-// assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
-// index-START_ID*NUMBER_OF_ROWS_PER_ID);
-// }
-//
-// public static class DummyPartition extends Partition {
-// private int id;
-//
-// public void setId(int id) {
-// this.id = id;
-// }
-//
-// public int getId() {
-// return id;
-// }
-//
-// @Override
-// public void readFields(DataInput in) throws IOException {
-// id = in.readInt();
-// }
-//
-// @Override
-// public void write(DataOutput out) throws IOException {
-// out.writeInt(id);
-// }
-//
-// @Override
-// public String toString() {
-// return Integer.toString(id);
-// }
-// }
-//
-// public static class DummyPartitioner extends Partitioner {
-// @Override
-// public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
-// List<Partition> partitions = new LinkedList<Partition>();
-// for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
-// DummyPartition partition = new DummyPartition();
-// partition.setId(id);
-// partitions.add(partition);
-// }
-// return partitions;
-// }
-// }
-//
-// public static class DummyExtractor extends Extractor {
-// @Override
-// public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
-// int id = ((DummyPartition)partition).getId();
-// for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
-// Object[] array = new Object[] {
-// id * NUMBER_OF_ROWS_PER_ID + row,
-// (double) (id * NUMBER_OF_ROWS_PER_ID + row),
-// new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1)
-// };
-// context.getDataWriter().writeArrayRecord(array);
-// }
-// }
-//
-// @Override
-// public long getRowsRead() {
-// return NUMBER_OF_ROWS_PER_ID;
-// }
-// }
-}