You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/11/27 20:44:36 UTC

svn commit: r1414351 - in /oozie/branches/hcat-intre: ./ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/hcat/ core/src/test/java/org/apache/oozie/command/coord/

Author: kamrul
Date: Tue Nov 27 19:44:35 2012
New Revision: 1414351

URL: http://svn.apache.org/viewvc?rev=1414351&view=rev
Log:
+OOZIE-1086 Command to check the missing partitions directly against HCatalog server (mohammad)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Modified:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
    oozie/branches/hcat-intre/release-log.txt

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1414351&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Tue Nov 27 19:44:35 2012
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.hcat.MetaDataClientWrapper;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XConfiguration;
+
+public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
+    private String actionId;
+    private JPAService jpaService = null;
+    private CoordinatorActionBean coordAction = null;
+
+    public CoordPushDependencyCheckXCommand(String actionId) {
+        super("coord_push_dep_check", "coord_push_dep_check", 0);
+        this.actionId = actionId;
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        String pushDeps = coordAction.getPushMissingDependencies();
+        if (pushDeps == null || pushDeps.length() == 0) {
+            LOG.info("Nothing to check. Empty push missing dependency");
+            return null;
+        }
+        LOG.info("Push missing dependencies .. "+ pushDeps);
+        LinkedList<String> pushDepList = new LinkedList<String>();
+        pushDepList.addAll(Arrays.asList(pushDeps.split(CoordELFunctions.DIR_SEPARATOR, -1)));
+       // List<String> pushDepList = Arrays.asList(pushDeps.split(CoordELFunctions.DIR_SEPARATOR, -1));
+        MetaDataClientWrapper mdClientWrap = new MetaDataClientWrapper();
+        Configuration actionConf = null;
+        try {
+            actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
+        }
+        catch (IOException e) {
+            throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
+        }
+        String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
+        mdClientWrap.checkList(pushDepList, user);
+        if (pushDepList.size() > 0) {
+            pushDeps = StringUtils.join(pushDepList, CoordELFunctions.DIR_SEPARATOR);
+            coordAction.setPushMissingDependencies(pushDeps);
+            // Checking for timeout
+            handleTimeout();
+        }
+        else { // All push-based dependencies are available
+            coordAction.setPushMissingDependencies("");
+            if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
+                coordAction.setStatus(CoordinatorAction.Status.READY);
+                // pass jobID to the CoordActionReadyXCommand
+                queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+            }
+        }
+        updateCoordAction(coordAction);
+        return null;
+    }
+
+    private void updateCoordAction(CoordinatorActionBean coordAction2) throws CommandException {
+        coordAction.setLastModifiedTime(new Date());
+        if (jpaService != null) {
+            try {
+                jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+            }
+            catch (JPAExecutorException jex) {
+                throw new CommandException(ErrorCode.E1023, jex.getMessage(), jex);
+            }
+        }
+    }
+
+    private void handleTimeout() {
+        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+                .getCreatedTime().getTime()))
+                / (60 * 1000);
+        int timeOut = coordAction.getTimeOut();
+        if ((timeOut >= 0) && (waitingTime > timeOut)) {
+            queue(new CoordActionTimeOutXCommand(coordAction), 100);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#getEntityKey()
+     */
+    @Override
+    public String getEntityKey() {
+        return actionId;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#isLockRequired()
+     */
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#loadState()
+     */
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            jpaService = Services.get().get(JPAService.class);
+
+            if (jpaService != null) {
+                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
+                LogUtils.setLogInfo(coordAction, logInfo);
+            }
+            else {
+                throw new CommandException(ErrorCode.E0610);
+            }
+        }
+        catch (XException ex) {
+            throw new CommandException(ex);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+     */
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+        if (coordAction.getStatus().equals(CoordinatorAction.Status.WAITING) == false) {
+            throw new PreconditionException(ErrorCode.E1100);
+        }
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java?rev=1414351&r1=1414350&r2=1414351&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java Tue Nov 27 19:44:35 2012
@@ -19,6 +19,7 @@ package org.apache.oozie.hcat;
 
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 
 import org.apache.hcatalog.api.HCatClient;
@@ -28,12 +29,22 @@ import org.apache.oozie.service.MetaData
 import org.apache.oozie.service.MetaDataAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
 
 /**
  * This class is a wrapper around the HCatalog client class
  */
 public class MetaDataClientWrapper {
 
+    public XLog LOG = XLog.getLog(getClass());
+    private MetaDataAccessorService mdac = null;
+
+    public MetaDataClientWrapper() {
+        mdac = Services.get().get(MetaDataAccessorService.class);
+        if (mdac == null) {
+            throw new RuntimeException("MetaDataAccessorService is not initialized. Check oozie-site.xml");
+        }
+    }
     /**
      * Query one partition.
      *
@@ -47,7 +58,7 @@ public class MetaDataClientWrapper {
      */
     public HCatPartition getOnePartition(String server, String db, String table, Map<String, String> partition,
             String user) throws MetaDataAccessorException {
-        HCatClient client = Services.get().get(MetaDataAccessorService.class).getHCatClient(server, user);
+        HCatClient client = mdac.getHCatClient(server, user);
         HCatPartition hPartition;
         try {
             hPartition = client.getPartition(db, table, partition);
@@ -90,7 +101,7 @@ public class MetaDataClientWrapper {
      */
     public List<HCatPartition> getPartitionsByFilter(String server, String db, String table, String filter, String user)
             throws MetaDataAccessorException {
-        HCatClient client = Services.get().get(MetaDataAccessorService.class).getHCatClient(server, user);
+        HCatClient client = mdac.getHCatClient(server, user);
         List<HCatPartition> hPartitions;
         try {
             hPartitions = client.listPartitionsByFilter(db, table, filter);
@@ -136,7 +147,7 @@ public class MetaDataClientWrapper {
      */
     public void dropOnePartition(String server, String db, String table, Map<String, String> partition,
             boolean ifExists, String user) throws MetaDataAccessorException {
-        HCatClient client = Services.get().get(MetaDataAccessorService.class).getHCatClient(server, user);
+        HCatClient client = mdac.getHCatClient(server, user);
         try {
             client.dropPartition(db, table, partition, ifExists);
         }
@@ -166,4 +177,26 @@ public class MetaDataClientWrapper {
         return;
     }
 
+    /**
+     * Check the list of dependencies against HCatalog server. If any partition
+     * is already in HCatalog, this method will remove it from the list.
+     *
+     * @param pushDepList : List of push-based dependencies.
+     * @param user : end-user id
+     */
+    public void checkList(List<String> pushDepList, String user) {
+        for (ListIterator<String> iter = pushDepList.listIterator(); iter.hasNext();) {
+            String depURI = iter.next();
+            HCatPartition part = null;
+            try {
+                part = getOnePartition(depURI, user);
+                iter.remove();
+                LOG.info("Found URI " + depURI);
+            }
+            catch (MetaDataAccessorException e) {
+                LOG.info("Not found " + depURI);
+            }
+        }
+    }
+
 }

Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1414351&r1=1414350&r2=1414351&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java Tue Nov 27 19:44:35 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.oozie.command.coord;
 
 import java.io.IOException;

Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1414351&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Tue Nov 27 19:44:35 2012
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatCreateDBDesc;
+import org.apache.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetaDataAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.XLog;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
+    private String TZ;
+    private String server = "local";
+    Services services = null;
+    // private String server = "thrift://localhost:11002"; // to specify the
+    // non-local endpoint.
+    private String isLocal = "true"; // false for non-local instance
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        setSystemProperty("hive.metastore.local", isLocal);
+        services = new Services();
+        addServiceToRun(services.getConf(), MetaDataAccessorService.class.getName());
+        services.init();
+        TZ = (getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" : getProcessingTZ()
+                .substring(3);
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testUpdateCoordTableSingleDep() throws Exception {
+        // Test for single dependency which is already in the hcat server
+        String db = "default";
+        String table = "tablename";
+        String newHCatDependency = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=usa";
+
+        populateTable(server, db, table);
+
+        String actionId = addInitRecords(newHCatDependency);
+        checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+        new CoordPushDependencyCheckXCommand(actionId).call();
+
+        checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+    }
+
+    @Test
+    public void testUpdateCoordTableMultipleDepsV1() throws Exception {
+        // Test for two dependencies which are already in the hcat server
+        String db = "default";
+        String table = "tablename";
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/12/2012&country=brazil";
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=usa";
+        String newHCatDependency = newHCatDependency1 + CoordELFunctions.DIR_SEPARATOR + newHCatDependency2;
+        populateTable(server, db, table);
+
+        String actionId = addInitRecords(newHCatDependency);
+        checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+        new CoordPushDependencyCheckXCommand(actionId).call();
+
+        checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+    }
+
+    @Test
+    public void testUpdateCoordTableMultipleDepsV2() throws Exception {
+        // Test for two dependencies : one of them is already existed in the
+        // hcat server. Other one is not.
+        // Expected to see the action in WAITING
+        // Later make the other partition also available. action is expected to
+        // be READY
+        String db = "default";
+        String table = "tablename";
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=brazil";
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=usa";
+        String newHCatDependency = newHCatDependency1 + CoordELFunctions.DIR_SEPARATOR + newHCatDependency2;
+        populateTable(server, db, table);
+
+        String actionId = addInitRecords(newHCatDependency);
+        checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+        new CoordPushDependencyCheckXCommand(actionId).call();
+
+        checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING, 0);
+
+        Map<String, String> partMap = new HashMap<String, String>();
+        partMap.put("dt", "04/30/2012");
+        partMap.put("country", "brazil");
+        addOneRecord(server, db, table, partMap);
+
+        new CoordPushDependencyCheckXCommand(actionId).call();
+
+        checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+    }
+
+    private void populateTable(String server, String db, String table) throws Exception {
+        createTable(server, db, table);
+        addRecords(server, db, table);
+
+    }
+
+    private void createTable(String server, String db, String tableName) throws Exception {
+        HCatClient client = services.get(MetaDataAccessorService.class).getHCatClient(server, getTestUser());
+        assertNotNull(client);
+        // Creating a table
+        HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(db).ifNotExists(true).build();
+        client.createDatabase(dbDesc);
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(new HCatFieldSchema("userid", Type.INT, "id columns"));
+        cols.add(new HCatFieldSchema("viewtime", Type.BIGINT, "view time columns"));
+        cols.add(new HCatFieldSchema("pageurl", Type.STRING, ""));
+        cols.add(new HCatFieldSchema("ip", Type.STRING, "IP Address of the User"));
+        ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
+        ptnCols.add(new HCatFieldSchema("dt", Type.STRING, "date column"));
+        ptnCols.add(new HCatFieldSchema("country", Type.STRING, "country column"));
+        HCatCreateTableDesc tableDesc = HCatCreateTableDesc.create(db, tableName, cols).fileFormat("sequencefile")
+                .partCols(ptnCols).build();
+        client.dropTable(db, tableName, true);
+        client.createTable(tableDesc);
+        List<String> tables = client.listTableNamesByPattern(db, "*");
+        assertTrue(tables.size() > 0);
+        assertTrue(tables.contains(tableName));
+        List<String> dbNames = client.listDatabaseNamesByPattern(db);
+        assertTrue(dbNames.size() == 1);
+        assertTrue(dbNames.contains(db));
+    }
+
+    private void addRecords(String server, String dbName, String tableName) throws Exception {
+        HCatClient client = services.get(MetaDataAccessorService.class).getHCatClient(server, getTestUser());
+        Map<String, String> firstPtn = new HashMap<String, String>();
+        firstPtn.put("dt", "04/30/2012");
+        firstPtn.put("country", "usa");
+        HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName, tableName, null, firstPtn).build();
+        client.addPartition(addPtn);
+
+        Map<String, String> secondPtn = new HashMap<String, String>();
+        secondPtn.put("dt", "04/12/2012");
+        secondPtn.put("country", "brazil");
+        HCatAddPartitionDesc addPtn2 = HCatAddPartitionDesc.create(dbName, tableName, null, secondPtn).build();
+        client.addPartition(addPtn2);
+
+        Map<String, String> thirdPtn = new HashMap<String, String>();
+        thirdPtn.put("dt", "04/13/2012");
+        thirdPtn.put("country", "brazil");
+        HCatAddPartitionDesc addPtn3 = HCatAddPartitionDesc.create(dbName, tableName, null, thirdPtn).build();
+        client.addPartition(addPtn3);
+    }
+
+    private void addOneRecord(String server, String dbName, String tableName, Map<String, String> partMap)
+            throws Exception {
+        HCatClient client = services.get(MetaDataAccessorService.class).getHCatClient(server, getTestUser());
+        HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName, tableName, null, partMap).build();
+        client.addPartition(addPtn);
+    }
+
+    private CoordinatorActionBean checkCoordAction(String actionId, String expDeps, CoordinatorAction.Status stat,
+            int type) throws Exception {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+            String missDeps = action.getPushMissingDependencies();
+            if (type != 0) {
+                assertEquals(new PartitionWrapper(missDeps), new PartitionWrapper(expDeps));
+            }
+            else {
+                assertEquals(missDeps, expDeps);
+            }
+            assertEquals(action.getStatus(), stat);
+
+            return action;
+        }
+        catch (JPAExecutorException se) {
+            throw new Exception("Action ID " + actionId + " was not stored properly in db");
+        }
+    }
+
+    private String addInitRecords(String pushMissingDependencies) throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.RUNNING, startTime, endTime, false, true, 3);
+
+        CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+                CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", pushMissingDependencies);
+        return action1.getId();
+    }
+
+    protected CoordinatorActionBean addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
+            CoordinatorAction.Status status, String resourceXmlName, String pushMissingDependencies) throws Exception {
+        CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0, TZ);
+        action.setPushMissingDependencies(pushMissingDependencies);
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action);
+            jpaService.execute(coordActionInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord action record to table");
+            throw je;
+        }
+        return action;
+    }
+
+    protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
+            Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
+
+        String testDir = getTestCaseDir();
+        CoordinatorJobBean coordJob = createCoordJob(testFileName, status, start, end, pending, doneMatd, lastActionNum);
+        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+        coordJob.setJobXml(appXml);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob);
+            jpaService.execute(coordInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord job record to table");
+            throw je;
+        }
+
+        return coordJob;
+    }
+
+    protected String getCoordJobXmlForWaiting(String testFileName, String testDir) {
+        try {
+            Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
+            String appXml = IOUtils.getReaderAsString(reader, -1);
+            appXml = appXml.replaceAll("#testDir", testDir);
+            return appXml;
+        }
+        catch (IOException ioe) {
+            throw new RuntimeException(XLog.format("Could not get " + testFileName, ioe));
+        }
+    }
+
+    protected String getProcessingTZ() {
+        return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
+    }
+
+}

Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1414351&r1=1414350&r2=1414351&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Tue Nov 27 19:44:35 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1086 Command to check the missing partitions directly against HCatalog server (mohammad)
 OOZIE-1050 Implement logic to update dependencies via push JMS message(mona via mohammad)
 OOZIE-1068 Metadata Accessor service for HCatalog(mohammad)
 OOZIE-1069 Update dataIn and dataOut EL functions to support partitions (mohammad)