You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/11/17 13:38:32 UTC

falcon git commit: FALCON-1486 Add Unit Test cases for HiveDR. Contributed by Peeyush Bishnoi.

Repository: falcon
Updated Branches:
  refs/heads/master af2ea867d -> 2d5516ec3


FALCON-1486 Add Unit Test cases for HiveDR. Contributed by Peeyush Bishnoi.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2d5516ec
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2d5516ec
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2d5516ec

Branch: refs/heads/master
Commit: 2d5516ec374333a7b39de7302c5cd511ba6ba9c8
Parents: af2ea86
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Nov 17 16:10:38 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Nov 17 16:10:38 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 addons/hivedr/pom.xml                           |  12 +
 .../java/org/apache/falcon/hive/HiveDRTool.java |   9 +-
 .../falcon/hive/MetaStoreEventSourcer.java      |  20 +-
 .../java/org/apache/falcon/hive/HiveDRTest.java | 252 +++++++++++++++++++
 pom.xml                                         |   1 +
 6 files changed, 284 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 612b3c9..18cf582 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,8 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1486 Add Unit Test cases for HiveDR(Peeyush Bishnoi via Ajay Yadava)
+
     FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao)
 
     FALCON-1593 Oozie setup failing in setup phase (Praveen Adlakha via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml
index c887166..f98e8c4 100644
--- a/addons/hivedr/pom.xml
+++ b/addons/hivedr/pom.xml
@@ -183,6 +183,18 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>derby.stream.error.file</name>
+                            <value>target/derby.log</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index c3d9b5c..df16c40 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -30,6 +30,7 @@ import org.apache.falcon.hive.util.EventSourcerUtils;
 import org.apache.falcon.hive.util.FileUtils;
 import org.apache.falcon.hive.util.HiveDRStatusStore;
 import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.HiveMetastoreUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.api.HCatClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -257,8 +259,11 @@ public class HiveDRTool extends Configured implements Tool {
                 +inputOptions.getJobName()+".id";
         Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile));
         try {
-            defaultSourcer = new MetaStoreEventSourcer(inputOptions.getSourceMetastoreUri(),
-                    inputOptions.getSourceMetastoreKerberosPrincipal(), inputOptions.getSourceHive2KerberosPrincipal(),
+            HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(
+                    inputOptions.getSourceMetastoreUri(),
+                    inputOptions.getSourceMetastoreKerberosPrincipal(),
+                    inputOptions.getSourceHive2KerberosPrincipal());
+            defaultSourcer = new MetaStoreEventSourcer(sourceMetastoreClient,
                     new DefaultPartitioner(drStore, eventSoucerUtil), eventSoucerUtil, lastEventsIdMap);
             inputFilename = defaultSourcer.sourceEvents(inputOptions);
         } finally {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
index 0e12e89..f008883 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
@@ -21,7 +21,6 @@ package org.apache.falcon.hive;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.hive.util.EventSourcerUtils;
 import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.falcon.hive.util.HiveMetastoreUtils;
 import org.apache.hive.hcatalog.api.HCatClient;
 import org.apache.hive.hcatalog.api.repl.ReplicationTask;
 import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
@@ -49,14 +48,11 @@ public class MetaStoreEventSourcer implements EventSourcer {
     private long lastCounter;
 
     /* TODO handle cases when no events. files will be empty and lists will be empty */
-    public MetaStoreEventSourcer(String sourceMetastoreUri, String sourceMetastoreKerberosPrincipal,
-                                 String sourceHive2KerberosPrincipal, Partitioner partitioner,
-                                 EventSourcerUtils eventSourcerUtils,  Map<String, Long> lastEventsIdMap)
-        throws Exception {
-
-        sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(sourceMetastoreUri,
-                sourceMetastoreKerberosPrincipal, sourceHive2KerberosPrincipal);
-        eventMetadata = new ReplicationEventMetadata();
+    public MetaStoreEventSourcer(HCatClient sourceMetastoreClient, Partitioner partitioner,
+                                 EventSourcerUtils eventSourcerUtils,
+                                 Map<String, Long> lastEventsIdMap) throws Exception {
+        this.sourceMetastoreClient = sourceMetastoreClient;
+        this.eventMetadata = new ReplicationEventMetadata();
         this.partitioner = partitioner;
         this.eventSourcerUtils = eventSourcerUtils;
         this.lastEventsIdMap = lastEventsIdMap;
@@ -149,7 +145,7 @@ public class MetaStoreEventSourcer implements EventSourcer {
     }
 
 
-    private void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName,
+    protected void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName,
                                                String tableName, String srcStagingDirProvider,
                                                String dstStagingDirProvider) throws Exception {
         String srcFilename = null;
@@ -196,6 +192,10 @@ public class MetaStoreEventSourcer implements EventSourcer {
         EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, tableName, srcFilename, tgtFilename);
     }
 
+    public String persistToMetaFile(String jobName) throws Exception {
+        return eventSourcerUtils.persistToMetaFile(eventMetadata, jobName);
+    }
+
     public void cleanUp() throws Exception {
         if (sourceMetastoreClient != null) {
             sourceMetastoreClient.close();

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java
new file mode 100644
index 0000000..cdeddaa
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import com.google.common.base.Function;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.hadoop.JailedFileSystem;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.DelimiterUtils;
+import org.apache.falcon.hive.util.EventSourcerUtils;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.messaging.MessageFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for Hive DR export and import.
+ */
+public class HiveDRTest {
+    private FileSystem fileSystem;
+    private HCatClient client;
+    private MetaStoreEventSourcer sourcer;
+    private EmbeddedCluster cluster;
+    private String dbName = "testdb";
+    private String tableName = "testtable";
+    private StagingDirectoryProvider stagingDirectoryProvider;
+    private MessageFactory msgFactory = MessageFactory.getInstance();
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        client = HCatClient.create(new HiveConf());
+        initializeFileSystem();
+        sourcer = new MetaStoreEventSourcer(client, null, new EventSourcerUtils(cluster.getConf(),
+                false, "hiveReplTest"), null);
+        stagingDirectoryProvider = new StagingDirectoryProvider.TrivialImpl("/tmp", "/");
+    }
+
+    private void initializeFileSystem() throws Exception {
+        cluster =  EmbeddedCluster.newCluster("hivedr");
+        fileSystem = new JailedFileSystem();
+        Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH);
+        fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf());
+        if (fileSystem.exists(storePath)) {
+            fileSystem.delete(storePath, true);
+        }
+        FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION);
+        HiveDRStatusStore drStatusStore = new HiveDRStatusStore(fileSystem,
+                fileSystem.getFileStatus(storePath).getGroup());
+    }
+
+    // Dummy mapping used for all db and table name mappings
+    private Function<String, String> debugMapping = new Function<String, String>(){
+        @Nullable
+        @Override
+        public String apply(@Nullable String s) {
+            if (s == null){
+                return null;
+            } else {
+                StringBuilder sb = new StringBuilder(s);
+                return sb.toString() + sb.reverse().toString();
+            }
+        }
+    };
+
+    @Test
+    public void testExportImportReplication() throws Exception {
+        Table t = new Table();
+        t.setDbName(dbName);
+        t.setTableName(tableName);
+        NotificationEvent event = new NotificationEvent(getEventId(), getTime(),
+                HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString());
+        event.setDbName(t.getDbName());
+        event.setTableName(t.getTableName());
+
+        HCatNotificationEvent hev = new HCatNotificationEvent(event);
+        ReplicationTask rtask = ReplicationTask.create(client, hev);
+
+        Assert.assertEquals(hev.toString(), rtask.getEvent().toString());
+        verifyExportImportReplicationTask(rtask);
+    }
+
+    private void verifyExportImportReplicationTask(ReplicationTask rtask) throws Exception {
+        Assert.assertEquals(true, rtask.needsStagingDirs());
+        Assert.assertEquals(false, rtask.isActionable());
+
+        rtask.withSrcStagingDirProvider(stagingDirectoryProvider)
+                .withDstStagingDirProvider(stagingDirectoryProvider)
+                .withDbNameMapping(debugMapping)
+                .withTableNameMapping(debugMapping);
+
+        List<ReplicationTask> taskAdd = new ArrayList<ReplicationTask>();
+        taskAdd.add(rtask);
+        sourcer.processTableReplicationEvents(taskAdd.iterator(), dbName, tableName,
+                stagingDirectoryProvider.toString(), stagingDirectoryProvider.toString());
+
+        String metaFileName = sourcer.persistToMetaFile("hiveReplTest");
+        String event = readEventFile(new Path(metaFileName));
+        Assert.assertEquals(event.split(DelimiterUtils.FIELD_DELIM).length, 4);
+        Assert.assertEquals(dbName,
+                new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[0]), "UTF-8"));
+        Assert.assertEquals(tableName,
+                new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[1]), "UTF-8"));
+
+        String exportStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[2]));
+        String[] commandList = exportStr.split(DelimiterUtils.NEWLINE_DELIM);
+        for (String command : commandList) {
+            Command cmd = ReplicationUtils.deserializeCommand(command);
+            Assert.assertEquals(cmd.getEventId(), 42);
+            for(String stmt : cmd.get()) {
+                Assert.assertTrue(stmt.startsWith("EXPORT TABLE"));
+            }
+        }
+
+        String importStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[3]));
+        commandList = importStr.split(DelimiterUtils.NEWLINE_DELIM);
+        for (String command : commandList) {
+            Command cmd = ReplicationUtils.deserializeCommand(command);
+            Assert.assertEquals(cmd.getEventId(), 42);
+            for (String stmt : cmd.get()) {
+                Assert.assertTrue(stmt.startsWith("IMPORT TABLE"));
+            }
+        }
+    }
+
+    @Test
+    public void testImportReplication() throws Exception {
+        Table t = new Table();
+        t.setDbName("testdb");
+        t.setTableName("testtable");
+        NotificationEvent event = new NotificationEvent(getEventId(), getTime(),
+                HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString());
+        event.setDbName(t.getDbName());
+        event.setTableName(t.getTableName());
+
+        HCatNotificationEvent hev = new HCatNotificationEvent(event);
+        ReplicationTask rtask = ReplicationTask.create(client, hev);
+
+        Assert.assertEquals(hev.toString(), rtask.getEvent().toString());
+        verifyImportReplicationTask(rtask);
+    }
+
+    private void verifyImportReplicationTask(ReplicationTask rtask) throws Exception {
+        Assert.assertEquals(false, rtask.needsStagingDirs());
+        Assert.assertEquals(true, rtask.isActionable());
+        rtask.withDbNameMapping(debugMapping)
+                .withTableNameMapping(debugMapping);
+
+        List<ReplicationTask> taskAdd = new ArrayList<ReplicationTask>();
+        taskAdd.add(rtask);
+        sourcer.processTableReplicationEvents(taskAdd.iterator(), dbName, tableName,
+                stagingDirectoryProvider.toString(), stagingDirectoryProvider.toString());
+        String persistFileName = sourcer.persistToMetaFile("hiveReplTest");
+        String event = readEventFile(new Path(persistFileName));
+
+        Assert.assertEquals(event.split(DelimiterUtils.FIELD_DELIM).length, 4);
+        Assert.assertEquals(dbName,
+                new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[0]), "UTF-8"));
+        Assert.assertEquals(tableName,
+                new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[1]), "UTF-8"));
+
+        String exportStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[2]));
+        String[] commandList = exportStr.split(DelimiterUtils.NEWLINE_DELIM);
+        for (String command : commandList) {
+            Command cmd = ReplicationUtils.deserializeCommand(command);
+            Assert.assertEquals(cmd.getEventId(), 42);
+            Assert.assertEquals(cmd.get().size(), 0);   //In case of drop size of export is 0. Metadata operation
+        }
+
+        String importStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[3]));
+        commandList = importStr.split(DelimiterUtils.NEWLINE_DELIM);
+        for (String command : commandList) {
+            Command cmd = ReplicationUtils.deserializeCommand(command);
+            Assert.assertEquals(cmd.getEventId(), 42);
+            for (String stmt : cmd.get()) {
+                Assert.assertTrue(stmt.startsWith("DROP TABLE"));
+            }
+        }
+    }
+
+    private long getEventId() {
+        // Does not need to be unique, just non-zero distinct value to test against.
+        return 42;
+    }
+
+    private int getTime() {
+        // Does not need to be actual time, just non-zero distinct value to test against.
+        return 1729;
+    }
+
+    private String readEventFile(Path eventFileName) throws IOException {
+        StringBuilder eventString = new StringBuilder();
+        BufferedReader in = new BufferedReader(new InputStreamReader(
+                fileSystem.open(eventFileName)));
+        try {
+            String line;
+            while ((line=in.readLine())!=null) {
+                eventString.append(line);
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+        return eventString.toString();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        client.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f2c480..dfaf1c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -307,6 +307,7 @@
                                 <exclude>**/maven-eclipse.xml</exclude>
                                 <exclude>**/.externalToolBuilders/**</exclude>
                                 <exclude>html5-ui/**</exclude>
+                                <exclude>**/metastore_db/**</exclude>
                                 <exclude>**/db1.log</exclude>
                                 <exclude>**/db1.properties</exclude>
                                 <exclude>**/db1.script</exclude>