You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bo...@apache.org on 2017/10/24 12:42:42 UTC

sqoop git commit: SQOOP-3225: Mainframe module FTP listing parser should cater for larger datasets on disk

Repository: sqoop
Updated Branches:
  refs/heads/trunk 1c1905b08 -> e8588e243


SQOOP-3225: Mainframe module FTP listing parser should cater for larger datasets on disk

(Chris Teoh via Boglarka Egyed)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e8588e24
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e8588e24
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e8588e24

Branch: refs/heads/trunk
Commit: e8588e243ea3f158193b4decfe5884620ec0f021
Parents: 1c1905b
Author: Boglarka Egyed <bo...@apache.org>
Authored: Tue Oct 24 14:39:57 2017 +0200
Committer: Boglarka Egyed <bo...@apache.org>
Committed: Tue Oct 24 14:39:57 2017 +0200

----------------------------------------------------------------------
 .../sqoop/util/MainframeFTPClientUtils.java     | 17 ++++--
 .../sqoop/util/TestMainframeFTPClientUtils.java | 58 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e8588e24/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
index f61b983..95bc0ec 100644
--- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -57,6 +57,12 @@ public final class MainframeFTPClientUtils {
     String dsName = pdsName;
     String fileName = "";
     MainframeDatasetPath p = null;
+    String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+    if (dsType == null) {
+      // default dataset type to partitioned dataset
+      conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
+      dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+    }
     try {
     	p = new MainframeDatasetPath(dsName,conf);
     } catch (Exception e) {
@@ -64,8 +70,6 @@ public final class MainframeFTPClientUtils {
     	LOG.error("MainframeDatasetPath helper class incorrectly initialised");
     	e.printStackTrace();
     }
-    String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
-    boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE));
     boolean isSequentialDs = false;
     boolean isGDG = false;
     if (dsType != null && p != null) {
@@ -80,7 +84,8 @@ public final class MainframeFTPClientUtils {
       if (ftp != null) {
         ftp.changeWorkingDirectory("'" + pdsName + "'");
         FTPFile[] ftpFiles = null;
-        if (isTape) {
+        if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) {
+          // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
         	FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
         	List<FTPFile> listing = new ArrayList<FTPFile>();
         	while(parser.hasNext()) {
@@ -102,7 +107,11 @@ public final class MainframeFTPClientUtils {
         		}
         	}
         }
-		else { ftpFiles = ftp.listFiles(); }
+		else {
+      // partitioned datasets have a different FTP listing structure
+      LOG.info("Dataset is a partitioned dataset, using default FTP list parsing");
+      ftpFiles = ftp.listFiles();
+        }
 		if (!isGDG) {
 			for (FTPFile f : ftpFiles) {
 				LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e8588e24/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
index d87c75d..90a8519 100644
--- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
+++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
@@ -21,12 +21,14 @@ package org.apache.sqoop.util;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 
 import java.util.List;
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPListParseEngine;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sqoop.mapreduce.JobBase;
@@ -42,12 +44,14 @@ public class TestMainframeFTPClientUtils {
   private JobConf conf;
 
   private FTPClient mockFTPClient;
+  private FTPListParseEngine mockFTPListParseEngine;
 
   @Before
   public void setUp() {
     conf = new JobConf();
     mockFTPClient = mock(FTPClient.class);
     when(mockFTPClient.getReplyString()).thenReturn("");
+    mockFTPListParseEngine = mock(FTPListParseEngine.class);
     MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
   }
 
@@ -173,7 +177,9 @@ public class TestMainframeFTPClientUtils {
         FTPFile file2 = new FTPFile();
         file2.setName("blah2");
         file2.setType(FTPFile.FILE_TYPE);
-      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2});
+      when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+      when(mockFTPListParseEngine.hasNext()).thenReturn(true,false);
+      when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
     } catch (IOException e) {
       fail("No IOException should be thrown!");
     }
@@ -210,7 +216,9 @@ public class TestMainframeFTPClientUtils {
         FTPFile file2 = new FTPFile();
         file2.setName("blah2");
         file2.setType(FTPFile.FILE_TYPE);
-      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+      when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+      when(mockFTPListParseEngine.hasNext()).thenReturn(true,false);
+      when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
     } catch (IOException e) {
       fail("No IOException should be thrown!");
     }
@@ -248,7 +256,9 @@ public class TestMainframeFTPClientUtils {
         FTPFile file2 = new FTPFile();
         file2.setName("blah2");
         file2.setType(FTPFile.FILE_TYPE);
-      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+      when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+      when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
+      when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
     } catch (IOException e) {
       fail("No IOException should be thrown!");
     }
@@ -287,7 +297,9 @@ public class TestMainframeFTPClientUtils {
 	        FTPFile file2 = new FTPFile();
 	        file2.setName("G0101V00");
 	        file2.setType(FTPFile.FILE_TYPE);
-	      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+        when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
+        when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
+        when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
 	    } catch (IOException e) {
 	      fail("No IOException should be thrown!");
 	    }
@@ -312,4 +324,42 @@ public class TestMainframeFTPClientUtils {
 			Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
 	    }
   }
+
+  @Test
+  public void testPartitionedDatasetsShouldReturnAllFiles() {
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+      when(mockFTPClient.changeWorkingDirectory("a.b.c.blah1")).thenReturn(true);
+      FTPFile file1 = new FTPFile();
+      file1.setName("blah1");
+      file1.setType(FTPFile.FILE_TYPE);
+      FTPFile file2 = new FTPFile();
+      file2.setName("blah2");
+      file2.setType(FTPFile.FILE_TYPE);
+      // initiateListParsing should not be called here as it is a partitioned dataset and default to listFiles()
+      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes());
+    try {
+      String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+      List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
+      Assert.assertTrue(files != null && files.size() == 2);
+      verify(mockFTPClient).listFiles();
+    } catch (IOException ioe) {
+      String ioeString = ioe.getMessage();
+      Assert.fail(ioeString);
+    }
+  }
 }