You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/11/27 20:44:36 UTC
svn commit: r1414351 - in /oozie/branches/hcat-intre: ./
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/hcat/
core/src/test/java/org/apache/oozie/command/coord/
Author: kamrul
Date: Tue Nov 27 19:44:35 2012
New Revision: 1414351
URL: http://svn.apache.org/viewvc?rev=1414351&view=rev
Log:
+OOZIE-1086 Command to check the missing partitions directly against HCatalog server (mohammad)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/release-log.txt
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1414351&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Tue Nov 27 19:44:35 2012
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.hcat.MetaDataClientWrapper;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XConfiguration;
+
+public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
+ private String actionId;
+ private JPAService jpaService = null;
+ private CoordinatorActionBean coordAction = null;
+
+ public CoordPushDependencyCheckXCommand(String actionId) {
+ super("coord_push_dep_check", "coord_push_dep_check", 0);
+ this.actionId = actionId;
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ String pushDeps = coordAction.getPushMissingDependencies();
+ if (pushDeps == null || pushDeps.length() == 0) {
+ LOG.info("Nothing to check. Empty push missing dependency");
+ return null;
+ }
+ LOG.info("Push missing dependencies .. "+ pushDeps);
+ LinkedList<String> pushDepList = new LinkedList<String>();
+ pushDepList.addAll(Arrays.asList(pushDeps.split(CoordELFunctions.DIR_SEPARATOR, -1)));
+ // List<String> pushDepList = Arrays.asList(pushDeps.split(CoordELFunctions.DIR_SEPARATOR, -1));
+ MetaDataClientWrapper mdClientWrap = new MetaDataClientWrapper();
+ Configuration actionConf = null;
+ try {
+ actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
+ }
+ catch (IOException e) {
+ throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
+ }
+ String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
+ mdClientWrap.checkList(pushDepList, user);
+ if (pushDepList.size() > 0) {
+ pushDeps = StringUtils.join(pushDepList, CoordELFunctions.DIR_SEPARATOR);
+ coordAction.setPushMissingDependencies(pushDeps);
+ // Checking for timeout
+ handleTimeout();
+ }
+ else { // All push-based dependencies are available
+ coordAction.setPushMissingDependencies("");
+ if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
+ coordAction.setStatus(CoordinatorAction.Status.READY);
+ // pass jobID to the CoordActionReadyXCommand
+ queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+ }
+ }
+ updateCoordAction(coordAction);
+ return null;
+ }
+
+ private void updateCoordAction(CoordinatorActionBean coordAction2) throws CommandException {
+ coordAction.setLastModifiedTime(new Date());
+ if (jpaService != null) {
+ try {
+ jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ }
+ catch (JPAExecutorException jex) {
+ throw new CommandException(ErrorCode.E1023, jex.getMessage(), jex);
+ }
+ }
+ }
+
+ private void handleTimeout() {
+ long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+ .getCreatedTime().getTime()))
+ / (60 * 1000);
+ int timeOut = coordAction.getTimeOut();
+ if ((timeOut >= 0) && (waitingTime > timeOut)) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#getEntityKey()
+ */
+ @Override
+ public String getEntityKey() {
+ return actionId;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#isLockRequired()
+ */
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#loadState()
+ */
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ jpaService = Services.get().get(JPAService.class);
+
+ if (jpaService != null) {
+ coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
+ LogUtils.setLogInfo(coordAction, logInfo);
+ }
+ else {
+ throw new CommandException(ErrorCode.E0610);
+ }
+ }
+ catch (XException ex) {
+ throw new CommandException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+ */
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ if (coordAction.getStatus().equals(CoordinatorAction.Status.WAITING) == false) {
+ throw new PreconditionException(ErrorCode.E1100);
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java?rev=1414351&r1=1414350&r2=1414351&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/hcat/MetaDataClientWrapper.java Tue Nov 27 19:44:35 2012
@@ -19,6 +19,7 @@ package org.apache.oozie.hcat;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import org.apache.hcatalog.api.HCatClient;
@@ -28,12 +29,22 @@ import org.apache.oozie.service.MetaData
import org.apache.oozie.service.MetaDataAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
/**
* This class is a wrapper around the HCatalog client class
*/
public class MetaDataClientWrapper {
+ public XLog LOG = XLog.getLog(getClass());
+ private MetaDataAccessorService mdac = null;
+
+ public MetaDataClientWrapper() {
+ mdac = Services.get().get(MetaDataAccessorService.class);
+ if (mdac == null) {
+ throw new RuntimeException("MetaDataAccessorService is not initialized. Check oozie-site.xml");
+ }
+ }
/**
* Query one partition.
*
@@ -47,7 +58,7 @@ public class MetaDataClientWrapper {
*/
public HCatPartition getOnePartition(String server, String db, String table, Map<String, String> partition,
String user) throws MetaDataAccessorException {
- HCatClient client = Services.get().get(MetaDataAccessorService.class).getHCatClient(server, user);
+ HCatClient client = mdac.getHCatClient(server, user);
HCatPartition hPartition;
try {
hPartition = client.getPartition(db, table, partition);
@@ -90,7 +101,7 @@ public class MetaDataClientWrapper {
*/
public List<HCatPartition> getPartitionsByFilter(String server, String db, String table, String filter, String user)
throws MetaDataAccessorException {
- HCatClient client = Services.get().get(MetaDataAccessorService.class).getHCatClient(server, user);
+ HCatClient client = mdac.getHCatClient(server, user);
List<HCatPartition> hPartitions;
try {
hPartitions = client.listPartitionsByFilter(db, table, filter);
@@ -136,7 +147,7 @@ public class MetaDataClientWrapper {
*/
public void dropOnePartition(String server, String db, String table, Map<String, String> partition,
boolean ifExists, String user) throws MetaDataAccessorException {
- HCatClient client = Services.get().get(MetaDataAccessorService.class).getHCatClient(server, user);
+ HCatClient client = mdac.getHCatClient(server, user);
try {
client.dropPartition(db, table, partition, ifExists);
}
@@ -166,4 +177,26 @@ public class MetaDataClientWrapper {
return;
}
+ /**
+ * Check the list of dependencies against HCatalog server. If any partition
+ * is already in HCatalog, this method will remove it from the list.
+ *
+ * @param pushDepList : List of push-based dependencies.
+ * @param user : end-user id
+ */
+ public void checkList(List<String> pushDepList, String user) {
+ for (ListIterator<String> iter = pushDepList.listIterator(); iter.hasNext();) {
+ String depURI = iter.next();
+ HCatPartition part = null;
+ try {
+ part = getOnePartition(depURI, user);
+ iter.remove();
+ LOG.info("Found URI " + depURI);
+ }
+ catch (MetaDataAccessorException e) {
+ LOG.info("Not found " + depURI);
+ }
+ }
+ }
+
}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1414351&r1=1414350&r2=1414351&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java Tue Nov 27 19:44:35 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.oozie.command.coord;
import java.io.IOException;
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1414351&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Tue Nov 27 19:44:35 2012
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatCreateDBDesc;
+import org.apache.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetaDataAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.XLog;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
+ private String TZ;
+ private String server = "local";
+ Services services = null;
+ // private String server = "thrift://localhost:11002"; // to specify the
+ // non-local endpoint.
+ private String isLocal = "true"; // false for non-local instance
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ setSystemProperty("hive.metastore.local", isLocal);
+ services = new Services();
+ addServiceToRun(services.getConf(), MetaDataAccessorService.class.getName());
+ services.init();
+ TZ = (getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" : getProcessingTZ()
+ .substring(3);
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testUpdateCoordTableSingleDep() throws Exception {
+ // Test for single dependency which is already in the hcat server
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=usa";
+
+ populateTable(server, db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+ new CoordPushDependencyCheckXCommand(actionId).call();
+
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+ }
+
+ @Test
+ public void testUpdateCoordTableMultipleDepsV1() throws Exception {
+ // Test for two dependencies which are already in the hcat server
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/12/2012&country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=usa";
+ String newHCatDependency = newHCatDependency1 + CoordELFunctions.DIR_SEPARATOR + newHCatDependency2;
+ populateTable(server, db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+ new CoordPushDependencyCheckXCommand(actionId).call();
+
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+ }
+
+ @Test
+ public void testUpdateCoordTableMultipleDepsV2() throws Exception {
+ // Test for two dependencies : one of them is already existed in the
+ // hcat server. Other one is not.
+ // Expected to see the action in WAITING
+ // Later make the other partition also available. action is expected to
+ // be READY
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/?dt=04/30/2012&country=usa";
+ String newHCatDependency = newHCatDependency1 + CoordELFunctions.DIR_SEPARATOR + newHCatDependency2;
+ populateTable(server, db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+ new CoordPushDependencyCheckXCommand(actionId).call();
+
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING, 0);
+
+ Map<String, String> partMap = new HashMap<String, String>();
+ partMap.put("dt", "04/30/2012");
+ partMap.put("country", "brazil");
+ addOneRecord(server, db, table, partMap);
+
+ new CoordPushDependencyCheckXCommand(actionId).call();
+
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+ }
+
+ private void populateTable(String server, String db, String table) throws Exception {
+ createTable(server, db, table);
+ addRecords(server, db, table);
+
+ }
+
+ private void createTable(String server, String db, String tableName) throws Exception {
+ HCatClient client = services.get(MetaDataAccessorService.class).getHCatClient(server, getTestUser());
+ assertNotNull(client);
+ // Creating a table
+ HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(db).ifNotExists(true).build();
+ client.createDatabase(dbDesc);
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(new HCatFieldSchema("userid", Type.INT, "id columns"));
+ cols.add(new HCatFieldSchema("viewtime", Type.BIGINT, "view time columns"));
+ cols.add(new HCatFieldSchema("pageurl", Type.STRING, ""));
+ cols.add(new HCatFieldSchema("ip", Type.STRING, "IP Address of the User"));
+ ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
+ ptnCols.add(new HCatFieldSchema("dt", Type.STRING, "date column"));
+ ptnCols.add(new HCatFieldSchema("country", Type.STRING, "country column"));
+ HCatCreateTableDesc tableDesc = HCatCreateTableDesc.create(db, tableName, cols).fileFormat("sequencefile")
+ .partCols(ptnCols).build();
+ client.dropTable(db, tableName, true);
+ client.createTable(tableDesc);
+ List<String> tables = client.listTableNamesByPattern(db, "*");
+ assertTrue(tables.size() > 0);
+ assertTrue(tables.contains(tableName));
+ List<String> dbNames = client.listDatabaseNamesByPattern(db);
+ assertTrue(dbNames.size() == 1);
+ assertTrue(dbNames.contains(db));
+ }
+
+ private void addRecords(String server, String dbName, String tableName) throws Exception {
+ HCatClient client = services.get(MetaDataAccessorService.class).getHCatClient(server, getTestUser());
+ Map<String, String> firstPtn = new HashMap<String, String>();
+ firstPtn.put("dt", "04/30/2012");
+ firstPtn.put("country", "usa");
+ HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName, tableName, null, firstPtn).build();
+ client.addPartition(addPtn);
+
+ Map<String, String> secondPtn = new HashMap<String, String>();
+ secondPtn.put("dt", "04/12/2012");
+ secondPtn.put("country", "brazil");
+ HCatAddPartitionDesc addPtn2 = HCatAddPartitionDesc.create(dbName, tableName, null, secondPtn).build();
+ client.addPartition(addPtn2);
+
+ Map<String, String> thirdPtn = new HashMap<String, String>();
+ thirdPtn.put("dt", "04/13/2012");
+ thirdPtn.put("country", "brazil");
+ HCatAddPartitionDesc addPtn3 = HCatAddPartitionDesc.create(dbName, tableName, null, thirdPtn).build();
+ client.addPartition(addPtn3);
+ }
+
+ private void addOneRecord(String server, String dbName, String tableName, Map<String, String> partMap)
+ throws Exception {
+ HCatClient client = services.get(MetaDataAccessorService.class).getHCatClient(server, getTestUser());
+ HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName, tableName, null, partMap).build();
+ client.addPartition(addPtn);
+ }
+
+ private CoordinatorActionBean checkCoordAction(String actionId, String expDeps, CoordinatorAction.Status stat,
+ int type) throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ String missDeps = action.getPushMissingDependencies();
+ if (type != 0) {
+ assertEquals(new PartitionWrapper(missDeps), new PartitionWrapper(expDeps));
+ }
+ else {
+ assertEquals(missDeps, expDeps);
+ }
+ assertEquals(action.getStatus(), stat);
+
+ return action;
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored properly in db");
+ }
+ }
+
+ private String addInitRecords(String pushMissingDependencies) throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, startTime, endTime, false, true, 3);
+
+ CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", pushMissingDependencies);
+ return action1.getId();
+ }
+
+ protected CoordinatorActionBean addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
+ CoordinatorAction.Status status, String resourceXmlName, String pushMissingDependencies) throws Exception {
+ CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0, TZ);
+ action.setPushMissingDependencies(pushMissingDependencies);
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action);
+ jpaService.execute(coordActionInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord action record to table");
+ throw je;
+ }
+ return action;
+ }
+
+ protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
+ Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
+
+ String testDir = getTestCaseDir();
+ CoordinatorJobBean coordJob = createCoordJob(testFileName, status, start, end, pending, doneMatd, lastActionNum);
+ String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+ coordJob.setJobXml(appXml);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob);
+ jpaService.execute(coordInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord job record to table");
+ throw je;
+ }
+
+ return coordJob;
+ }
+
+ protected String getCoordJobXmlForWaiting(String testFileName, String testDir) {
+ try {
+ Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
+ String appXml = IOUtils.getReaderAsString(reader, -1);
+ appXml = appXml.replaceAll("#testDir", testDir);
+ return appXml;
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(XLog.format("Could not get " + testFileName, ioe));
+ }
+ }
+
+ protected String getProcessingTZ() {
+ return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
+ }
+
+}
Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1414351&r1=1414350&r2=1414351&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Tue Nov 27 19:44:35 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1086 Command to check the missing partitions directly against HCatalog server (mohammad)
OOZIE-1050 Implement logic to update dependencies via push JMS message(mona via mohammad)
OOZIE-1068 Metadata Accessor service for HCatalog(mohammad)
OOZIE-1069 Update dataIn and dataOut EL functions to support partitions (mohammad)