You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/09/10 17:27:41 UTC
svn commit: r1521527 [1/3] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/compression/
core/src/main/java/org/apache/oozie/executor/jpa/
core/src/main/java/org/apache/oozie/service/ core/src/main/java/org...
Author: virag
Date: Tue Sep 10 15:27:39 2013
New Revision: 1521527
URL: http://svn.apache.org/r1521527
Log:
OOZIE-1462 Compress lob columns before storing in database (virag)
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/BinaryBlob.java
oozie/trunk/core/src/main/java/org/apache/oozie/StringBlob.java
oozie/trunk/core/src/main/java/org/apache/oozie/compression/
oozie/trunk/core/src/main/java/org/apache/oozie/compression/CodecFactory.java
oozie/trunk/core/src/main/java/org/apache/oozie/compression/CompressionCodec.java
oozie/trunk/core/src/main/java/org/apache/oozie/compression/GzipCompressionCodec.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BinaryBlobValueHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/StringBlobValueHandler.java
oozie/trunk/core/src/test/java/org/apache/oozie/compression/
oozie/trunk/core/src/test/java/org/apache/oozie/compression/TestCodecFactory.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobInfoGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetNeedStartJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetPausedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetRunningOrPendingJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetUnpausedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionForNominalTimeJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsNotCompletedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetPausedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetPendingJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsGetUnpausedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionSubsetGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsGetForJobJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsRunningGetJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetActionsJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/JPAService.java
oozie/trunk/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
oozie/trunk/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionQueryExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
oozie/trunk/release-log.txt
Added: oozie/trunk/core/src/main/java/org/apache/oozie/BinaryBlob.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BinaryBlob.java?rev=1521527&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BinaryBlob.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BinaryBlob.java Tue Sep 10 15:27:39 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.oozie.compression.CodecFactory;
+import org.apache.oozie.compression.CompressionCodec;
+
+/**
+ * BinaryBlob to maintain compress and uncompressed data
+ */
+public class BinaryBlob {
+
+ private byte[] rawBlob;
+ private byte[] bytes;
+
+ /**
+ * Construct a binaryblob
+ *
+ * @param byteArray
+ * @param isUncompressed - true if data is uncompressed
+ */
+ public BinaryBlob(byte[] byteArray, boolean isUncompressed) {
+ if (isUncompressed) {
+ this.bytes = byteArray;
+ this.rawBlob = null;
+ }
+ else {
+ this.rawBlob = byteArray;
+ }
+ }
+
+ /**
+ * Set bytes
+ *
+ * @param byteArray
+ */
+ public void setBytes(byte[] byteArray) {
+ this.bytes = byteArray;
+ this.rawBlob = null;
+ }
+
+ /**
+ * Returns a decompressed byte array
+ *
+ * @return byte array
+ */
+ public byte[] getBytes() {
+ if (bytes != null) {
+ return bytes;
+ }
+ if (rawBlob == null) {
+ return null;
+ }
+ try {
+ DataInputStream dais = new DataInputStream(new ByteArrayInputStream(rawBlob));
+ CompressionCodec codec = CodecFactory.getDeCompressionCodec(dais);
+ if (codec != null) {
+ bytes = codec.decompressToBytes(dais);
+ }
+ else {
+ bytes = rawBlob;
+ }
+ dais.close();
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ rawBlob = null;
+ return bytes;
+
+ }
+
+ /**
+ * Returns a raw blob
+ *
+ * @return raw blob
+ */
+ public byte[] getRawBlob() {
+ if (rawBlob != null) {
+ return rawBlob;
+ }
+ if (bytes == null) {
+ return null;
+ }
+ if (CodecFactory.isCompressionEnabled()) {
+ byte[] headerBytes = CodecFactory.getHeaderBytes();
+ try {
+ rawBlob = CodecFactory.getCompressionCodec().compressBytes(headerBytes, bytes);
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ else {
+ rawBlob = bytes;
+ }
+ return rawBlob;
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/BundleJobBean.java Tue Sep 10 15:27:39 2013
@@ -48,6 +48,7 @@ import org.apache.oozie.client.rest.Json
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -120,9 +121,11 @@ public class BundleJobBean implements Wr
@Column(name = "external_id")
private String externalId = null;
+ @Basic
@Column(name = "conf")
@Lob
- private String conf = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob conf;
@Basic
@Column(name = "time_out")
@@ -183,13 +186,17 @@ public class BundleJobBean implements Wr
@Column(name = "suspended_time")
private java.sql.Timestamp suspendedTimestamp = null;
+ @Basic
@Column(name = "job_xml")
@Lob
- private String jobXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob jobXml;
+ @Basic
@Column(name = "orig_job_xml")
@Lob
- private String origJobXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob origJobXml = null;
@Transient
@@ -354,30 +361,57 @@ public class BundleJobBean implements Wr
* @return the jobXml
*/
public String getJobXml() {
- return jobXml;
+ return jobXml == null ? null : jobXml.getString();
}
/**
* @param jobXml the jobXml to set
*/
public void setJobXml(String jobXml) {
- this.jobXml = jobXml;
+ if (this.jobXml == null) {
+ this.jobXml = new StringBlob(jobXml);
+ }
+ else {
+ this.jobXml.setString(jobXml);
+ }
+
+ }
+
+ public void setJobXmlBlob (StringBlob jobXmlBlob) {
+ this.jobXml = jobXmlBlob;
+ }
+
+ public StringBlob getJobXmlBlob() {
+ return jobXml;
}
/**
* @return the origJobXml
*/
public String getOrigJobXml() {
- return origJobXml;
+ return origJobXml == null ? null : origJobXml.getString();
}
/**
* @param origJobXml the origJobXml to set
*/
public void setOrigJobXml(String origJobXml) {
+ if (this.origJobXml == null) {
+ this.origJobXml = new StringBlob(origJobXml);
+ }
+ else {
+ this.origJobXml.setString(origJobXml);
+ }
+ }
+
+ public void setOrigJobXmlBlob (StringBlob origJobXml) {
this.origJobXml = origJobXml;
}
+ public StringBlob getOrigJobXmlBlob() {
+ return origJobXml;
+ }
+
/**
* @param createTime the createdTime to set
*/
@@ -499,7 +533,7 @@ public class BundleJobBean implements Wr
json.put(JsonTags.BUNDLE_JOB_NAME, appName);
json.put(JsonTags.BUNDLE_JOB_ID, id);
json.put(JsonTags.BUNDLE_JOB_EXTERNAL_ID, externalId);
- json.put(JsonTags.BUNDLE_JOB_CONF, conf);
+ json.put(JsonTags.BUNDLE_JOB_CONF, getConf());
json.put(JsonTags.BUNDLE_JOB_STATUS, getStatus().toString());
json.put(JsonTags.BUNDLE_JOB_TIMEUNIT, getTimeUnit().toString());
json.put(JsonTags.BUNDLE_JOB_TIMEOUT, timeOut);
@@ -530,7 +564,7 @@ public class BundleJobBean implements Wr
@Override
public String getConf() {
- return conf;
+ return conf == null ? null : conf.getString();
}
@Override
@@ -619,9 +653,22 @@ public class BundleJobBean implements Wr
* @param conf the conf to set
*/
public void setConf(String conf) {
+ if (this.conf == null) {
+ this.conf = new StringBlob(conf);
+ }
+ else {
+ this.conf.setString(conf);
+ }
+ }
+
+ public void setConfBlob(StringBlob conf) {
this.conf = conf;
}
+ public StringBlob getConfBlob() {
+ return conf;
+ }
+
/**
* Set status
*
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Tue Sep 10 15:27:39 2013
@@ -46,6 +46,7 @@ import org.apache.oozie.client.rest.Json
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -204,9 +205,11 @@ public class CoordinatorActionBean imple
@Column(name = "external_id")
private String externalId;
+ @Basic
@Column(name = "sla_xml")
@Lob
- private String slaXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob slaXml = null;
@Basic
@Column(name = "pending")
@@ -220,29 +223,39 @@ public class CoordinatorActionBean imple
@Column(name = "action_number")
private int actionNumber;
+ @Basic
@Column(name = "created_conf")
@Lob
- private String createdConf;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob createdConf;
@Basic
@Column(name = "time_out")
private int timeOut = 0;
+ @Basic
@Column(name = "run_conf")
@Lob
- private String runConf;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob runConf;
+ @Basic
@Column(name = "action_xml")
@Lob
- private String actionXml;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob actionXml;
+ @Basic
@Column(name = "missing_dependencies")
@Lob
- private String missingDependencies;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob missingDependencies;
+ @Basic
@Column(name = "push_missing_dependencies")
@Lob
- private String pushMissingDependencies;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob pushMissingDependencies;
@Basic
@Column(name = "external_status")
@@ -417,14 +430,27 @@ public class CoordinatorActionBean imple
this.externalId = externalId;
}
- public String getSlaXml() {
+ public StringBlob getSlaXmlBlob() {
return slaXml;
}
- public void setSlaXml(String slaXml) {
+ public void setSlaXmlBlob(StringBlob slaXml) {
this.slaXml = slaXml;
}
+ public String getSlaXml() {
+ return slaXml == null ? null : slaXml.getString();
+ }
+
+ public void setSlaXml(String slaXml) {
+ if (this.slaXml == null) {
+ this.slaXml = new StringBlob(slaXml);
+ }
+ else {
+ this.slaXml.setString(slaXml);
+ }
+ }
+
/**
* @return true if in terminal status
*/
@@ -536,41 +562,93 @@ public class CoordinatorActionBean imple
@Override
public String getCreatedConf() {
- return createdConf;
+ return createdConf == null ? null : createdConf.getString();
}
public void setCreatedConf(String createdConf) {
+ if (this.createdConf == null) {
+ this.createdConf = new StringBlob(createdConf);
+ }
+ else {
+ this.createdConf.setString(createdConf);
+ }
+ }
+
+ public void setCreatedConfBlob(StringBlob createdConf) {
this.createdConf = createdConf;
}
+ public StringBlob getCreatedConfBlob() {
+ return createdConf;
+ }
public void setRunConf(String runConf) {
- this.runConf = runConf;
+ if (this.runConf == null) {
+ this.runConf = new StringBlob(runConf);
+ }
+ else {
+ this.runConf.setString(runConf);
+ }
}
@Override
public String getRunConf() {
+ return runConf == null ? null : runConf.getString();
+ }
+
+ public void setRunConfBlob(StringBlob runConf) {
+ this.runConf = runConf;
+ }
+
+ public StringBlob getRunConfBlob() {
return runConf;
}
+
public void setMissingDependencies(String missingDependencies) {
- this.missingDependencies = missingDependencies;
+ if (this.missingDependencies == null) {
+ this.missingDependencies = new StringBlob(missingDependencies);
+ }
+ else {
+ this.missingDependencies.setString(missingDependencies);
+ }
}
@Override
public String getMissingDependencies() {
+ return missingDependencies == null ? null : missingDependencies.getString();
+ }
+
+ public void setMissingDependenciesBlob(StringBlob missingDependencies) {
+ this.missingDependencies = missingDependencies;
+ }
+
+ public StringBlob getMissingDependenciesBlob() {
return missingDependencies;
}
@Override
public String getPushMissingDependencies() {
- return pushMissingDependencies;
+ return pushMissingDependencies == null ? null : pushMissingDependencies.getString();
}
public void setPushMissingDependencies(String pushMissingDependencies) {
+ if (this.pushMissingDependencies == null) {
+ this.pushMissingDependencies = new StringBlob(pushMissingDependencies);
+ }
+ else {
+ this.pushMissingDependencies.setString(pushMissingDependencies);
+ }
+ }
+
+ public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) {
this.pushMissingDependencies = pushMissingDependencies;
}
+ public StringBlob getPushMissingDependenciesBlob() {
+ return pushMissingDependencies;
+ }
+
public String getExternalStatus() {
return externalStatus;
}
@@ -613,13 +691,26 @@ public class CoordinatorActionBean imple
}
public String getActionXml() {
- return actionXml;
+ return actionXml == null ? null : actionXml.getString();
}
public void setActionXml(String actionXml) {
+ if (this.actionXml == null) {
+ this.actionXml = new StringBlob(actionXml);
+ }
+ else {
+ this.actionXml.setString(actionXml);
+ }
+ }
+
+ public void setActionXmlBlob(StringBlob actionXml) {
this.actionXml = actionXml;
}
+ public StringBlob getActionXmlBlob() {
+ return actionXml;
+ }
+
@Override
public String toString() {
return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
@@ -650,7 +741,7 @@ public class CoordinatorActionBean imple
json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
- json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, createdConf);
+ json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils
.formatDateRfc822(getCreatedTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils
@@ -659,15 +750,15 @@ public class CoordinatorActionBean imple
// json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
// .formatDateRfc822(startTime), timeZoneId);
json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
- json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, runConf);
+ json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils
.formatDateRfc822(getLastModifiedTime(), timeZoneId));
// json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
// .formatDateRfc822(startTime), timeZoneId);
// json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
// .formatDateRfc822(endTime), timeZoneId);
- json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, missingDependencies);
- json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, pushMissingDependencies);
+ json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies());
+ json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies());
json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java Tue Sep 10 15:27:39 2013
@@ -45,6 +45,7 @@ import org.apache.oozie.client.rest.Json
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -129,9 +130,11 @@ public class CoordinatorJobBean implemen
@Column(name = "external_id")
private String externalId = null;
+ @Basic
@Column(name = "conf")
@Lob
- private String conf = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob conf = null;
@Basic
@Column(name = "frequency")
@@ -227,18 +230,23 @@ public class CoordinatorJobBean implemen
@Column(name = "suspended_time")
private java.sql.Timestamp suspendedTimestamp = null;
+ @Basic
@Column(name = "job_xml")
@Lob
- private String jobXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob jobXml = null;
+ @Basic
@Column(name = "orig_job_xml")
@Lob
- private String origJobXml = null;
-
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob origJobXml = null;
+ @Basic
@Column(name = "sla_xml")
@Lob
- private String slaXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob slaXml = null;
@Basic
@Column(name = "pending")
@@ -348,7 +356,7 @@ public class CoordinatorJobBean implemen
* @return job xml
*/
public String getJobXml() {
- return jobXml;
+ return jobXml == null ? null : jobXml.getString();
}
/**
@@ -357,7 +365,20 @@ public class CoordinatorJobBean implemen
* @param jobXml job xml
*/
public void setJobXml(String jobXml) {
- this.jobXml = jobXml;
+ if (this.jobXml == null) {
+ this.jobXml = new StringBlob(jobXml);
+ }
+ else {
+ this.jobXml.setString(jobXml);
+ }
+ }
+
+ public void setJobXmlBlob (StringBlob jobXmlBlob) {
+ this.jobXml = jobXmlBlob;
+ }
+
+ public StringBlob getJobXmlBlob() {
+ return jobXml;
}
/**
@@ -366,7 +387,7 @@ public class CoordinatorJobBean implemen
* @return original job xml
*/
public String getOrigJobXml() {
- return origJobXml;
+ return origJobXml == null ? null : origJobXml.getString();
}
/**
@@ -375,16 +396,29 @@ public class CoordinatorJobBean implemen
* @param origJobXml
*/
public void setOrigJobXml(String origJobXml) {
+ if (this.origJobXml == null) {
+ this.origJobXml = new StringBlob(origJobXml);
+ }
+ else {
+ this.origJobXml.setString(origJobXml);
+ }
+ }
+
+ public void setOrigJobXmlBlob (StringBlob origJobXml) {
this.origJobXml = origJobXml;
}
+ public StringBlob getOrigJobXmlBlob() {
+ return origJobXml;
+ }
+
/**
* Get sla xml
*
* @return sla xml
*/
public String getSlaXml() {
- return slaXml;
+ return slaXml == null ? null : slaXml.getString();
}
/**
@@ -393,9 +427,23 @@ public class CoordinatorJobBean implemen
* @param slaXml sla xml
*/
public void setSlaXml(String slaXml) {
+ if (this.slaXml == null) {
+ this.slaXml = new StringBlob(slaXml);
+ }
+ else {
+ this.slaXml.setString(slaXml);
+ }
+ }
+
+ public void setSlaXmlBlob(StringBlob slaXml) {
this.slaXml = slaXml;
}
+ public StringBlob getSlaXmlBlob() {
+ return slaXml;
+ }
+
+
/**
* Set last action timestamp
@@ -809,13 +857,26 @@ public class CoordinatorJobBean implemen
}
public String getConf() {
- return conf;
+ return conf == null ? null : conf.getString();
}
public void setConf(String conf) {
+ if (this.conf == null) {
+ this.conf = new StringBlob(conf);
+ }
+ else {
+ this.conf.setString(conf);
+ }
+ }
+
+ public void setConfBlob(StringBlob conf) {
this.conf = conf;
}
+ public StringBlob getConfBlob() {
+ return conf;
+ }
+
public void setFrequency(String frequency) {
this.frequency = frequency;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Tue Sep 10 15:27:39 2013
@@ -238,7 +238,6 @@ public enum ErrorCode {
E1602(XLog.STD, "Cannot retrieve Topic name [{0}]"),
E1700(XLog.STD, "Issue communicating with ZooKeeper: {0}"),
-
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
private String template;
Added: oozie/trunk/core/src/main/java/org/apache/oozie/StringBlob.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/StringBlob.java?rev=1521527&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/StringBlob.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/StringBlob.java Tue Sep 10 15:27:39 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.oozie.compression.CodecFactory;
+import org.apache.oozie.compression.CompressionCodec;
+
+/**
+ * StringBlob to maintain compress and uncompressed data
+ */
+public class StringBlob {
+
+ private byte[] rawBlob;
+ private String string;
+
+ /**
+ * Construct string blob from compressed byte array
+ *
+ * @param byteArray
+ */
+ public StringBlob(byte[] byteArray) {
+ this.rawBlob = byteArray;
+ }
+
+ /**
+ * Construct StringBlob with uncompressed string
+ *
+ * @param string
+ */
+ public StringBlob(String inputString) {
+ this.string = inputString;
+ this.rawBlob = null;
+ }
+
+ /**
+ * Set string
+ *
+ * @param str
+ */
+ public void setString(String str) {
+ this.string = str;
+ this.rawBlob = null;
+ }
+
+ /**
+ * Get uncompressed string
+ *
+ * @return uncompressed string
+ */
+ public String getString() {
+ if (string != null) {
+ return string;
+ }
+ if (rawBlob == null) {
+ return null;
+ }
+ try {
+ DataInputStream dais = new DataInputStream(new ByteArrayInputStream(rawBlob));
+ CompressionCodec codec = CodecFactory.getDeCompressionCodec(dais);
+ if (codec != null) {
+ string = codec.decompressToString(dais);
+ }
+ else {
+ string = new String(rawBlob, CodecFactory.UTF_8_ENCODING);
+ }
+ dais.close();
+
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ rawBlob = null;
+ return string;
+ }
+
+ /**
+ * Get raw blob
+ *
+ * @return raw blob
+ */
+ public byte[] getRawBlob() {
+ if (rawBlob != null) {
+ return rawBlob;
+ }
+ if (string == null) {
+ return null;
+ }
+ if (CodecFactory.isCompressionEnabled()) {
+ byte[] bytes = CodecFactory.getHeaderBytes();
+ try {
+ rawBlob = CodecFactory.getCompressionCodec().compressString(bytes, string);
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ else {
+ rawBlob = string.getBytes();
+ }
+ return rawBlob;
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java Tue Sep 10 15:27:39 2013
@@ -45,11 +45,14 @@ import org.apache.oozie.util.ParamChecke
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
+
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
/**
- * Bean that contains all the information to start an action for a workflow node.
+ * Bean that contains all the information to start an action for a workflow
+ * node.
*/
@Entity
@NamedQueries({
@@ -93,11 +96,8 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.statusStr = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
@NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.statusStr = 'START_RETRY' OR a.statusStr = 'START_MANUAL' OR a.statusStr = 'END_RETRY' OR a.statusStr = 'END_MANUAL')") })
-
@Table(name = "WF_ACTIONS")
public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
-
-
@Id
private String id;
@@ -144,9 +144,11 @@ public class WorkflowActionBean implemen
@Column(name = "log_token")
private String logToken = null;
+ @Basic
@Column(name = "sla_xml")
@Lob
- private String slaXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob slaXml;
@Basic
@Column(name = "name")
@@ -163,7 +165,8 @@ public class WorkflowActionBean implemen
@Basic
@Column(name = "conf")
@Lob
- private String conf = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob conf;
@Basic
@Column(name = "retries")
@@ -185,17 +188,23 @@ public class WorkflowActionBean implemen
@Column(name = "transition")
private String transition = null;
+ @Basic
@Column(name = "data")
@Lob
- private String data = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob data;
+ @Basic
@Column(name = "stats")
@Lob
- private String stats = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob stats;
+ @Basic
@Column(name = "external_child_ids")
@Lob
- private String externalChildIDs = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob externalChildIDs;
@Basic
@Column(name = "external_id")
@@ -220,7 +229,6 @@ public class WorkflowActionBean implemen
@Column(name = "error_message", length = 500)
private String errorMessage = null;
-
/**
* Default constructor.
*/
@@ -348,8 +356,8 @@ public class WorkflowActionBean implemen
* Return if the action is START_RETRY or START_MANUAL or END_RETRY or
* END_MANUAL.
*
- * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
- * END_MANUAL
+ * @return boolean true if status is START_RETRY or START_MANUAL or
+ * END_RETRY or END_MANUAL
*/
public boolean isRetryOrManual() {
return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
@@ -371,8 +379,8 @@ public class WorkflowActionBean implemen
* @return if the action is complete.
*/
public boolean isComplete() {
- return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
- getStatus() == WorkflowAction.Status.ERROR;
+ return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED
+ || getStatus() == WorkflowAction.Status.ERROR;
}
/**
@@ -407,7 +415,8 @@ public class WorkflowActionBean implemen
}
/**
- * Set a time when the action will be pending, normally a time in the future.
+ * Set a time when the action will be pending, normally a time in the
+ * future.
*
* @param pendingAge the time when the action will be pending.
*/
@@ -418,7 +427,8 @@ public class WorkflowActionBean implemen
/**
* Return the pending age of the action.
*
- * @return the pending age of the action, <code>null</code> if the action is not pending.
+ * @return the pending age of the action, <code>null</code> if the action is
+ * not pending.
*/
public Date getPendingAge() {
return DateUtils.toDate(pendingAgeTimestamp);
@@ -456,7 +466,8 @@ public class WorkflowActionBean implemen
}
/**
- * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
+ * Set a tracking information for an action, and set the action status to
+ * {@link Action.Status#DONE}
*
* @param externalId external ID for the action.
* @param trackerUri tracker URI for the action.
@@ -475,10 +486,12 @@ public class WorkflowActionBean implemen
}
/**
- * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
+ * Set the completion information for an action start. Sets the Action
+ * status to {@link Action.Status#DONE}
*
* @param externalStatus action external end status.
- * @param actionData action output data, <code>null</code> if there is no action output data.
+ * @param actionData action output data, <code>null</code> if there is no
+ * action output data.
*/
public void setExecutionData(String externalStatus, Properties actionData) {
setStatus(Status.DONE);
@@ -513,7 +526,7 @@ public class WorkflowActionBean implemen
*/
@Override
public String getExternalChildIDs() {
- return externalChildIDs;
+ return externalChildIDs == null ? null : externalChildIDs.getString();
}
/**
@@ -522,15 +535,39 @@ public class WorkflowActionBean implemen
* @param externalChildIDs as a string.
*/
public void setExternalChildIDs(String externalChildIDs) {
+ if (this.externalChildIDs == null) {
+ this.externalChildIDs = new StringBlob(externalChildIDs);
+ }
+ else {
+ this.externalChildIDs.setString(externalChildIDs);
+ }
+ }
+
+ /**
+ * Set external child ids
+ *
+ * @param externalChildIds
+ */
+ public void setExternalChildIDsBlob(StringBlob externalChildIDs) {
this.externalChildIDs = externalChildIDs;
}
/**
+ * Get external ChildIds
+ *
+ * @return
+ */
+ public StringBlob getExternalChildIDsBlob() {
+ return externalChildIDs;
+ }
+
+ /**
* Set the completion information for an action end.
*
- * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
- * Action.Status#KILLED}
- * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
+ * @param status action status, {@link Action.Status#OK} or
+ * {@link Action.Status#ERROR} or {@link Action.Status#KILLED}
+ * @param signalValue the signal value. In most cases, the value should be
+ * OK or ERROR.
*/
public void setEndData(Status status, String signalValue) {
if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
@@ -544,7 +581,6 @@ public class WorkflowActionBean implemen
setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
}
-
/**
* Return the job Id.
*
@@ -572,24 +608,30 @@ public class WorkflowActionBean implemen
this.wfId = id;
}
- /**
- * Get sla xml
- * @return the sla xml
- */
+ public void setSlaXml(String slaXmlStr) {
+ if (this.slaXml == null) {
+ this.slaXml = new StringBlob(slaXmlStr);
+ }
+ else {
+ this.slaXml.setString(slaXmlStr);
+ }
+ }
+
public String getSlaXml() {
- return slaXml;
+ return slaXml == null ? null : slaXml.getString();
}
- /**
- * Set sla Xml
- * @param slaXml
- */
- public void setSlaXml(String slaXml) {
+ public void setSlaXmlBlob(StringBlob slaXml) {
this.slaXml = slaXml;
}
+ public StringBlob getSlaXmlBlob() {
+ return slaXml;
+ }
+
/**
* Set status of job
+ *
* @param val
*/
public void setStatus(Status val) {
@@ -603,6 +645,7 @@ public class WorkflowActionBean implemen
/**
* Set status
+ *
* @param statusStr
*/
public void setStatusStr(String statusStr) {
@@ -611,11 +654,13 @@ public class WorkflowActionBean implemen
/**
* Get status
+ *
* @return
*/
public String getStatusStr() {
return statusStr;
}
+
/**
* Return the node execution path.
*
@@ -635,8 +680,10 @@ public class WorkflowActionBean implemen
}
/**
- * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
- * OK or ERROR.
+ * Return the signal value for the action.
+ * <p/>
+ * For decision nodes it is the choosen transition, for actions it is OK or
+ * ERROR.
*
* @return the action signal value.
*/
@@ -645,8 +692,10 @@ public class WorkflowActionBean implemen
}
/**
- * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
- * or ERROR.
+ * Set the signal value for the action.
+ * <p/>
+ * For decision nodes it is the choosen transition, for actions it is OK or
+ * ERROR.
*
* @param signalValue the action signal value.
*/
@@ -708,7 +757,6 @@ public class WorkflowActionBean implemen
return endTimestamp;
}
-
/**
* Return the action last check time
*
@@ -738,6 +786,7 @@ public class WorkflowActionBean implemen
/**
* Set start time
+ *
* @param startTime
*/
public void setStartTime(Date startTime) {
@@ -751,13 +800,13 @@ public class WorkflowActionBean implemen
/**
* Set end time
+ *
* @param endTime
*/
public void setEndTime(Date endTime) {
this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
}
-
@SuppressWarnings("unchecked")
public JSONObject toJSONObject() {
return toJSONObject("GMT");
@@ -770,15 +819,15 @@ public class WorkflowActionBean implemen
json.put(JsonTags.WORKFLOW_ACTION_NAME, name);
json.put(JsonTags.WORKFLOW_ACTION_AUTH, cred);
json.put(JsonTags.WORKFLOW_ACTION_TYPE, type);
- json.put(JsonTags.WORKFLOW_ACTION_CONF, conf);
+ json.put(JsonTags.WORKFLOW_ACTION_CONF, getConf());
json.put(JsonTags.WORKFLOW_ACTION_STATUS, statusStr);
json.put(JsonTags.WORKFLOW_ACTION_RETRIES, (long) retries);
json.put(JsonTags.WORKFLOW_ACTION_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
json.put(JsonTags.WORKFLOW_ACTION_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
json.put(JsonTags.WORKFLOW_ACTION_TRANSITION, transition);
- json.put(JsonTags.WORKFLOW_ACTION_DATA, data);
- json.put(JsonTags.WORKFLOW_ACTION_STATS, stats);
- json.put(JsonTags.WORKFLOW_ACTION_EXTERNAL_CHILD_IDS, externalChildIDs);
+ json.put(JsonTags.WORKFLOW_ACTION_DATA, getData());
+ json.put(JsonTags.WORKFLOW_ACTION_STATS, getStats());
+ json.put(JsonTags.WORKFLOW_ACTION_EXTERNAL_CHILD_IDS, getExternalChildIDs());
json.put(JsonTags.WORKFLOW_ACTION_EXTERNAL_ID, externalId);
json.put(JsonTags.WORKFLOW_ACTION_EXTERNAL_STATUS, externalStatus);
json.put(JsonTags.WORKFLOW_ACTION_TRACKER_URI, trackerUri);
@@ -827,13 +876,26 @@ public class WorkflowActionBean implemen
@Override
public String getConf() {
- return conf;
+ return conf == null ? null : conf.getString();
}
public void setConf(String conf) {
+ if (this.conf == null) {
+ this.conf = new StringBlob(conf);
+ }
+ else {
+ this.conf.setString(conf);
+ }
+ }
+
+ public void setConfBlob(StringBlob conf) {
this.conf = conf;
}
+ public StringBlob getConfBlob() {
+ return conf;
+ }
+
@Override
public int getRetries() {
return retries;
@@ -862,7 +924,8 @@ public class WorkflowActionBean implemen
}
/**
- * Set user retry max
+ * Set user retry max
+ *
* @param retryMax
*/
public void setUserRetryMax(int retryMax) {
@@ -885,6 +948,7 @@ public class WorkflowActionBean implemen
/**
* Set transition
+ *
* @param transition
*/
public void setTransition(String transition) {
@@ -893,30 +957,58 @@ public class WorkflowActionBean implemen
@Override
public String getData() {
- return data;
+ return data == null ? null : data.getString();
}
/**
* Set data
+ *
* @param data
*/
public void setData(String data) {
+ if (this.data == null) {
+ this.data = new StringBlob(data);
+ }
+ else {
+ this.data.setString(data);
+ }
+ }
+
+ public void setDataBlob(StringBlob data) {
this.data = data;
}
+ public StringBlob getDataBlob() {
+ return data;
+ }
+
@Override
public String getStats() {
- return stats;
+ return stats == null ? null : stats.getString();
}
/**
* Set stats
+ *
* @param stats
*/
public void setStats(String stats) {
+ if (this.stats == null) {
+ this.stats = new StringBlob(stats);
+ }
+ else {
+ this.stats.setString(stats);
+ }
+ }
+
+ public void setStatsBlob(StringBlob stats) {
this.stats = stats;
}
+ public StringBlob getStatsBlob() {
+ return this.stats;
+ }
+
@Override
public String getExternalId() {
return externalId;
@@ -924,6 +1016,7 @@ public class WorkflowActionBean implemen
/**
* Set external Id
+ *
* @param externalId
*/
public void setExternalId(String externalId) {
@@ -937,6 +1030,7 @@ public class WorkflowActionBean implemen
/**
* Set external status
+ *
* @param externalStatus
*/
public void setExternalStatus(String externalStatus) {
@@ -950,6 +1044,7 @@ public class WorkflowActionBean implemen
/**
* Set tracker uri
+ *
* @param trackerUri
*/
public void setTrackerUri(String trackerUri) {
@@ -963,6 +1058,7 @@ public class WorkflowActionBean implemen
/**
* Set console URL
+ *
* @param consoleUrl
*/
public void setConsoleUrl(String consoleUrl) {
@@ -981,12 +1077,13 @@ public class WorkflowActionBean implemen
/**
* Set the error Info
+ *
* @param errorCode
* @param errorMessage
*/
public void setErrorInfo(String errorCode, String errorMessage) {
this.errorCode = errorCode;
- if(errorMessage != null && errorMessage.length() > 500){
+ if (errorMessage != null && errorMessage.length() > 500) {
errorMessage = errorMessage.substring(0, 500);
}
this.errorMessage = errorMessage;
@@ -1013,5 +1110,4 @@ public class WorkflowActionBean implemen
return array;
}
-
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Tue Sep 10 15:27:39 2013
@@ -49,6 +49,7 @@ import javax.persistence.Transient;
import java.sql.Timestamp;
import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -113,9 +114,11 @@ public class WorkflowJobBean implements
@Id
private String id;
+ @Basic
@Column(name = "proto_action_conf")
@Lob
- private String protoActionConf = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob protoActionConf;
@Basic
@Column(name = "log_token")
@@ -149,15 +152,17 @@ public class WorkflowJobBean implements
@Column(name = "last_modified_time")
private java.sql.Timestamp lastModifiedTimestamp = null;
- // @Basic(fetch = FetchType.LAZY)
- // @Column(name="wfinstance",columnDefinition="blob")
+ @Basic
@Column(name = "wf_instance")
@Lob
- private byte[] wfInstance = null;
+ @Strategy("org.apache.oozie.executor.jpa.BinaryBlobValueHandler")
+ private BinaryBlob wfInstance ;
+ @Basic
@Column(name = "sla_xml")
@Lob
- private String slaXml = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob slaXml;
@Basic
@@ -168,9 +173,11 @@ public class WorkflowJobBean implements
@Column(name = "app_path")
private String appPath = null;
+ @Basic
@Column(name = "conf")
@Lob
- private String conf = null;
+ @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
+ private StringBlob conf;
@Basic
@Column(name = "user_name")
@@ -223,7 +230,7 @@ public class WorkflowJobBean implements
WritableUtils.writeStr(dataOutput, getGroup());
dataOutput.writeInt(getRun());
WritableUtils.writeStr(dataOutput, logToken);
- WritableUtils.writeStr(dataOutput, protoActionConf);
+ WritableUtils.writeStr(dataOutput, getProtoActionConf());
}
/**
@@ -260,9 +267,8 @@ public class WorkflowJobBean implements
setGroup(WritableUtils.readStr(dataInput));
setRun(dataInput.readInt());
logToken = WritableUtils.readStr(dataInput);
- protoActionConf = WritableUtils.readStr(dataInput);
+ setProtoActionConf(WritableUtils.readStr(dataInput));
setExternalId(getExternalId());
- setProtoActionConf(protoActionConf);
}
public boolean inTerminalState() {
@@ -288,39 +294,70 @@ public class WorkflowJobBean implements
}
public String getSlaXml() {
- return slaXml;
+ return slaXml == null ? null : slaXml.getString();
}
public void setSlaXml(String slaXml) {
+ if (this.slaXml == null) {
+ this.slaXml = new StringBlob(slaXml);
+ }
+ else {
+ this.slaXml.setString(slaXml);
+ }
+ }
+
+ public void setSlaXmlBlob(StringBlob slaXml) {
this.slaXml = slaXml;
}
+ public StringBlob getSlaXmlBlob() {
+ return this.slaXml;
+ }
+
public WorkflowInstance getWorkflowInstance() {
- return get(this.wfInstance);
+ return wfInstance == null ? null : get(wfInstance.getBytes());
}
- public byte[] getWfInstance() {
- return wfInstance;
+ public BinaryBlob getWfInstanceBlob() {
+ return this.wfInstance;
}
public void setWorkflowInstance(WorkflowInstance workflowInstance) {
- setWfInstance(workflowInstance);
+ if (this.wfInstance == null) {
+ this.wfInstance = new BinaryBlob(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance), true);
+ }
+ else {
+ this.wfInstance.setBytes(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance));
+ }
}
- public void setWfInstance(byte[] wfInstance) {
+ public void setWfInstanceBlob(BinaryBlob wfInstance) {
this.wfInstance = wfInstance;
}
- public void setWfInstance(WorkflowInstance wfInstance) {
- this.wfInstance = WritableUtils.toByteArray((LiteWorkflowInstance) wfInstance);
- }
-
public String getProtoActionConf() {
- return protoActionConf;
+ return protoActionConf == null ? null : protoActionConf.getString();
}
public void setProtoActionConf(String protoActionConf) {
- this.protoActionConf = protoActionConf;
+ if (this.protoActionConf == null) {
+ this.protoActionConf = new StringBlob(protoActionConf);
+ }
+ else {
+ this.protoActionConf.setString(protoActionConf);
+ }
+ }
+
+ public void setProtoActionConfBlob (StringBlob protoBytes) {
+ this.protoActionConf = protoBytes;
+ }
+
+ public StringBlob getProtoActionConfBlob() {
+ return this.protoActionConf;
+ }
+
+ public String getlogToken() {
+ return logToken;
}
public Timestamp getLastModifiedTimestamp() {
@@ -457,13 +494,26 @@ public class WorkflowJobBean implements
}
public String getConf() {
- return conf;
+ return conf == null ? null : conf.getString();
}
public void setConf(String conf) {
+ if (this.conf == null) {
+ this.conf = new StringBlob(conf);
+ }
+ else {
+ this.conf.setString(conf);
+ }
+ }
+
+ public void setConfBlob(StringBlob conf) {
this.conf = conf;
}
+ public StringBlob getConfBlob() {
+ return this.conf;
+ }
+
public String getUser() {
return user;
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/compression/CodecFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/compression/CodecFactory.java?rev=1521527&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/compression/CodecFactory.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/compression/CodecFactory.java Tue Sep 10 15:27:39 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.compression;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Utility class for maintaining list of codecs and providing facility
+ * for compressing and decompressing.
+ *
+ */
+public class CodecFactory {
+ private static final Map<String, CompressionCodec> REGISTERED = new HashMap<String, CompressionCodec>();
+ public static final String COMPRESSION_CODECS = "oozie.compression.codecs";
+ public static final String COMPRESSION_OUTPUT_CODEC = "oozie.output.compression.codec";
+ private static CompressionCodec outputCompressionCodec;
+ public static final String COMPRESSION_MAGIC_DATA = "OBJ";
+ public static final String COMPRESSION_KEY_HEADER = "codec";
+ public static final String UTF_8_ENCODING = "UTF-8";
+ private static boolean isEnabled;
+ private static XLog LOG = XLog.getLog(CodecFactory.class);;
+ private static byte[] headerBytes;
+
+ /**
+ * Initialize the codec factory to maintain list of codecs
+ * @param conf
+ * @throws Exception
+ */
+ public static void initialize(Configuration conf) throws Exception {
+ String outputCompressionStr = conf.get(COMPRESSION_OUTPUT_CODEC);
+ if (outputCompressionStr == null || outputCompressionStr.trim().equalsIgnoreCase("NONE") ||
+ outputCompressionStr.trim().equalsIgnoreCase("")) {
+ isEnabled = false;
+ }
+ else {
+ outputCompressionStr = outputCompressionStr.trim();
+ isEnabled = true;
+ }
+ String[] outputCompressionCodecs = conf.getStrings(COMPRESSION_CODECS);
+ for (String comp : outputCompressionCodecs) {
+ parseCompressionConfig(comp);
+ }
+ if (isEnabled) {
+ if (REGISTERED.get(GzipCompressionCodec.CODEC_NAME) == null) {
+ REGISTERED.put(GzipCompressionCodec.CODEC_NAME, new GzipCompressionCodec());
+ }
+ outputCompressionCodec = REGISTERED.get(outputCompressionStr);
+ if (outputCompressionCodec == null) {
+ throw new RuntimeException("No codec class found for codec " + outputCompressionStr);
+ }
+ }
+ LOG.info("Using " + outputCompressionStr + " as output compression codec");
+
+ // Initialize header bytes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new DataOutputStream(baos);
+ // magic data
+ daos.write(COMPRESSION_MAGIC_DATA.getBytes(UTF_8_ENCODING));
+ // version
+ daos.writeInt(1);
+ // no of key value pairs
+ daos.writeInt(1);
+ daos.writeUTF(COMPRESSION_KEY_HEADER);
+ daos.writeUTF(outputCompressionStr);
+ daos.close();
+ headerBytes = baos.toByteArray();
+
+ }
+
+ private static void parseCompressionConfig(String comp) throws Exception {
+ String[] compression = comp.split("=", 2);
+ if (compression.length == 2) {
+ String key = compression[0];
+ String value = compression[1];
+ REGISTERED.put(key, (CompressionCodec) Class.forName(value).newInstance());
+ LOG.info("Adding [{0}] to list of output compression codecs", key);
+ }
+ else {
+ throw new IllegalArgumentException("Property " + comp + " not in key=value format"
+ + "; output compression cannot be enabled");
+ }
+ }
+
+ private static CompressionCodec getCodec(String key) {
+ CompressionCodec codec = REGISTERED.get(key);
+ if (codec != null) {
+ return codec;
+ }
+ else {
+ throw new RuntimeException("No compression algo found corresponding to " + key);
+ }
+ }
+
+ /**
+ * Check whether compression is enabled or not
+ * @return true if compression is enabled
+ */
+ public static boolean isCompressionEnabled() {
+ return isEnabled;
+ }
+
+ /**
+ * Get decompression codec after reading from stream
+ * @param dais the input stream
+ * @return the decompression codec
+ * @throws IOException
+ */
+ public static CompressionCodec getDeCompressionCodec(DataInputStream dais) throws IOException {
+ byte[] buffer = new byte[COMPRESSION_MAGIC_DATA.length()];
+ dais.read(buffer, 0, buffer.length);
+ Map<String, String> compressionProps = new HashMap<String, String>();
+ try {
+ if (new String(buffer, UTF_8_ENCODING).equals(COMPRESSION_MAGIC_DATA)) {
+ // read Version; need to handle if multiple versions are
+ // supported
+ dais.readInt();
+ // read no of key value pairs; need to handle if more than one
+ dais.readInt();
+ compressionProps.put(dais.readUTF(), dais.readUTF());
+ }
+ else {
+ dais.reset();
+ return null;
+ }
+ }
+ catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException(ex);
+ }
+ return getCodec(compressionProps.get(COMPRESSION_KEY_HEADER));
+ }
+
+ /**
+ * Get output compression codec
+ * @return the compression codec
+ */
+ public static CompressionCodec getCompressionCodec() {
+ return outputCompressionCodec;
+ }
+
+ /**
+ * Get header bytes
+ * @return the header bytes
+ */
+ public static byte[] getHeaderBytes() {
+ return headerBytes;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/compression/CompressionCodec.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/compression/CompressionCodec.java?rev=1521527&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/compression/CompressionCodec.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/compression/CompressionCodec.java Tue Sep 10 15:27:39 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.compression;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public interface CompressionCodec {
+
+ public byte[] compressBytes(byte[] header, byte[] data) throws IOException;
+
+ public byte[] compressString(byte[] header, String data) throws IOException;
+
+ public String decompressToString(DataInputStream dais) throws IOException;
+
+ public byte[] decompressToBytes(DataInputStream dais) throws IOException;
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/compression/GzipCompressionCodec.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/compression/GzipCompressionCodec.java?rev=1521527&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/compression/GzipCompressionCodec.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/compression/GzipCompressionCodec.java Tue Sep 10 15:27:39 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.compression;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Class to compress and decompress data using Gzip codec
+ *
+ */
+public class GzipCompressionCodec implements CompressionCodec {
+
+ public static final String CODEC_NAME = "gz";
+
+ public byte[] compressBytes(byte[] header, byte[] data) throws IOException {
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(2000);
+ byteOutput.write(header);
+ GZIPOutputStream gzipOut = new GZIPOutputStream(byteOutput);
+ gzipOut.write(data);
+ gzipOut.close();
+ return byteOutput.toByteArray();
+ }
+
+ public byte[] compressString(byte[] header, String data) throws IOException {
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(1000);
+ byteOutput.write(header);
+ GZIPOutputStream gzipOut = new GZIPOutputStream(byteOutput);
+ gzipOut.write(data.getBytes(CodecFactory.UTF_8_ENCODING));
+ gzipOut.close();
+ return byteOutput.toByteArray();
+ }
+
+ public String decompressToString(DataInputStream dais) throws IOException {
+ GZIPInputStream gzipIn = new GZIPInputStream(dais);
+ String decompress = IOUtils.toString(gzipIn, CodecFactory.UTF_8_ENCODING);
+ gzipIn.close();
+ return decompress;
+ }
+
+ public byte[] decompressToBytes(DataInputStream dais) throws IOException {
+ GZIPInputStream gzipIn = new GZIPInputStream(dais);
+ byte[] decompress = IOUtils.toByteArray(gzipIn);
+ gzipIn.close();
+ return decompress;
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BinaryBlobValueHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BinaryBlobValueHandler.java?rev=1521527&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BinaryBlobValueHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BinaryBlobValueHandler.java Tue Sep 10 15:27:39 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.BinaryBlob;
+import org.apache.openjpa.jdbc.kernel.JDBCStore;
+import org.apache.openjpa.jdbc.meta.ValueMapping;
+
+@SuppressWarnings("serial")
+public class BinaryBlobValueHandler extends org.apache.openjpa.jdbc.meta.strats.ByteArrayValueHandler {
+
+ private static final BinaryBlobValueHandler _instance = new BinaryBlobValueHandler();
+
+ /**
+ * Singleton instance.
+ */
+ public static BinaryBlobValueHandler getInstance() {
+ return _instance;
+ }
+
+ public Object toDataStoreValue(ValueMapping vm, Object val, JDBCStore store) {
+ if (val == null) {
+ return null;
+ }
+ return ((BinaryBlob) val).getRawBlob();
+ }
+
+ public Object toObjectValue(ValueMapping vm, Object val) {
+ if (val == null) {
+ return null;
+ }
+ return new BinaryBlob((byte[]) val, false);
+ }
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -38,6 +38,7 @@ import org.apache.oozie.BulkResponseInfo
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
@@ -263,7 +264,7 @@ public class BulkJPAExecutor implements
actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
}
if (arr[9] != null) {
- actionBean.setMissingDependencies((String) arr[9]);
+ actionBean.setMissingDependenciesBlob((StringBlob) arr[9]);
}
if (arr[10] != null) {
coordBean.setId((String) arr[10]);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobInfoGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobInfoGetJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobInfoGetJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobInfoGetJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,6 +27,7 @@ import javax.persistence.Query;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.BundleJobInfo;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.BundleJob.Timeunit;
import org.apache.oozie.store.StoreStatusFilter;
@@ -139,7 +140,7 @@ public class BundleJobInfoGetJPAExecutor
bean.setAppPath((String) arr[2]);
}
if (arr[3] != null) {
- bean.setConf((String) arr[3]);
+ bean.setConfBlob((StringBlob) arr[3]);
}
if (arr[4] != null) {
bean.setStatus(Job.Status.valueOf((String) arr[4]));
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java Tue Sep 10 15:27:39 2013
@@ -74,13 +74,13 @@ public class BundleJobQueryExecutor exte
case UPDATE_BUNDLE_JOB:
query.setParameter("appName", bjBean.getAppName());
query.setParameter("appPath", bjBean.getAppPath());
- query.setParameter("conf", bjBean.getConf());
+ query.setParameter("conf", bjBean.getConfBlob());
query.setParameter("timeOut", bjBean.getTimeout());
query.setParameter("createdTimestamp", bjBean.getCreatedTimestamp());
query.setParameter("endTimestamp", bjBean.getEndTimestamp());
- query.setParameter("jobXml", bjBean.getJobXml());
+ query.setParameter("jobXml", bjBean.getJobXmlBlob());
query.setParameter("lastModifiedTimestamp", bjBean.getLastModifiedTimestamp());
- query.setParameter("origJobXml", bjBean.getOrigJobXml());
+ query.setParameter("origJobXml", bjBean.getOrigJobXmlBlob());
query.setParameter("startTimestamp", bjBean.getstartTimestamp());
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("timeUnit", bjBean.getTimeUnit());
@@ -133,13 +133,14 @@ public class BundleJobQueryExecutor exte
public Query getSelectQuery(BundleJobQuery namedQuery, EntityManager em, Object... parameters)
throws JPAExecutorException {
Query query = em.createNamedQuery(namedQuery.name());
- switch (namedQuery) {
+ BundleJobQuery bjQuery = (BundleJobQuery) namedQuery;
+ switch (bjQuery) {
case GET_BUNDLE_JOB:
query.setParameter("id", parameters[0]);
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
- + namedQuery.name());
+ + bjQuery.name());
}
return query;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetNeedStartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetNeedStartJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetNeedStartJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetNeedStartJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -47,18 +47,15 @@ public class BundleJobsGetNeedStartJPAEx
@SuppressWarnings("unchecked")
public List<BundleJobBean> execute(EntityManager em) throws JPAExecutorException {
List<BundleJobBean> bjBeans;
- List<BundleJobBean> jobList = new ArrayList<BundleJobBean>();
try {
Query q = em.createNamedQuery("GET_BUNDLE_JOBS_NEED_START");
q.setParameter("currentTime", new Timestamp(date.getTime()));
bjBeans = q.getResultList();
- for (BundleJobBean j : bjBeans) {
- jobList.add(j);
- }
+
}
catch (Exception e) {
throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
}
- return jobList;
+ return bjBeans;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetPausedJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetPausedJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetPausedJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetPausedJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -45,20 +45,17 @@ public class BundleJobsGetPausedJPAExecu
@SuppressWarnings("unchecked")
public List<BundleJobBean> execute(EntityManager em) throws JPAExecutorException {
List<BundleJobBean> bjBeans;
- List<BundleJobBean> jobList = new ArrayList<BundleJobBean>();
try {
Query q = em.createNamedQuery("GET_BUNDLE_JOBS_PAUSED");
if (limit > 0) {
q.setMaxResults(limit);
}
bjBeans = q.getResultList();
- for (BundleJobBean j : bjBeans) {
- jobList.add(j);
- }
+
}
catch (Exception e) {
throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
}
- return jobList;
+ return bjBeans;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetRunningOrPendingJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetRunningOrPendingJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetRunningOrPendingJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetRunningOrPendingJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -51,20 +51,16 @@ public class BundleJobsGetRunningOrPendi
@SuppressWarnings("unchecked")
public List<BundleJobBean> execute(EntityManager em) throws JPAExecutorException {
List<BundleJobBean> bjBeans;
- List<BundleJobBean> jobList = new ArrayList<BundleJobBean>();
try {
Query q = em.createNamedQuery("GET_BUNDLE_JOBS_RUNNING_OR_PENDING");
if (limit > 0) {
q.setMaxResults(limit);
}
bjBeans = q.getResultList();
- for (BundleJobBean j : bjBeans) {
- jobList.add(j);
- }
}
catch (Exception e) {
throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
}
- return jobList;
+ return bjBeans;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetUnpausedJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetUnpausedJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetUnpausedJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsGetUnpausedJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -45,20 +45,16 @@ public class BundleJobsGetUnpausedJPAExe
@SuppressWarnings("unchecked")
public List<BundleJobBean> execute(EntityManager em) throws JPAExecutorException {
List<BundleJobBean> bjBeans;
- List<BundleJobBean> jobList = new ArrayList<BundleJobBean>();
try {
Query q = em.createNamedQuery("GET_BUNDLE_JOBS_UNPAUSED");
if (limit > 0) {
q.setMaxResults(limit);
}
bjBeans = q.getResultList();
- for (BundleJobBean j : bjBeans) {
- jobList.add(j);
- }
}
catch (Exception e) {
throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
}
- return jobList;
+ return bjBeans;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -25,6 +25,7 @@ import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
@@ -88,7 +89,7 @@ public class CoordActionGetForCheckJPAEx
bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[5]));
}
if (arr[6] != null){
- bean.setSlaXml((String) arr[6]);
+ bean.setSlaXmlBlob((StringBlob) arr[6]);
}
if (arr[7] != null){
bean.setNominalTime(DateUtils.toDate((Timestamp) arr[7]));
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -25,6 +25,7 @@ import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
@@ -92,7 +93,7 @@ public class CoordActionGetForExternalId
bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[5]));
}
if (arr[6] != null){
- bean.setSlaXml((String) arr[6]);
+ bean.setSlaXmlBlob((StringBlob) arr[6]);
}
if (arr[7] != null){
bean.setNominalTime(DateUtils.toDate((Timestamp) arr[7]));
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -25,6 +25,7 @@ import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
@@ -88,7 +89,7 @@ public class CoordActionGetForInfoJPAExe
}
if (caBeans != null && caBeans.size() > 0) {
- CoordinatorActionBean bean = getBeanForCoordAction(caBeans.get(0));
+ CoordinatorActionBean bean = caBeans.get(0);
return bean;
}
else {
@@ -99,34 +100,6 @@ public class CoordActionGetForInfoJPAExe
}
- private CoordinatorActionBean getBeanForCoordAction(CoordinatorActionBean a){
- if (a != null) {
- CoordinatorActionBean action = new CoordinatorActionBean();
- action.setId(a.getId());
- action.setActionNumber(a.getActionNumber());
- action.setActionXml(a.getActionXml());
- action.setConsoleUrl(a.getConsoleUrl());
- action.setCreatedConf(a.getCreatedConf());
- action.setExternalStatus(a.getExternalStatus());
- action.setMissingDependencies(a.getMissingDependencies());
- action.setPushMissingDependencies(a.getPushMissingDependencies());
- action.setRunConf(a.getRunConf());
- action.setTimeOut(a.getTimeOut());
- action.setTrackerUri(a.getTrackerUri());
- action.setType(a.getType());
- action.setCreatedTime(a.getCreatedTime());
- action.setExternalId(a.getExternalId());
- action.setJobId(a.getJobId());
- action.setLastModifiedTime(a.getLastModifiedTime());
- action.setNominalTime(a.getNominalTime());
- action.setSlaXml(a.getSlaXml());
- action.setStatus(a.getStatus());
- action.setPending(a.getPending());
- action.setRerunTime(a.getRerunTime());
- return action;
- }
- return null;
- }
private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
CoordinatorActionBean bean = new CoordinatorActionBean();
@@ -170,10 +143,10 @@ public class CoordActionGetForInfoJPAExe
bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[12]));
}
if (arr[13] != null) {
- bean.setMissingDependencies((String) arr[13]);
+ bean.setMissingDependenciesBlob((StringBlob) arr[13]);
}
if (arr[14] != null) {
- bean.setPushMissingDependencies((String) arr[14]);
+ bean.setPushMissingDependenciesBlob((StringBlob) arr[14]);
}
return bean;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java?rev=1521527&r1=1521526&r2=1521527&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java Tue Sep 10 15:27:39 2013
@@ -23,6 +23,7 @@ import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
@@ -77,7 +78,7 @@ public class CoordActionGetForInputCheck
bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
}
if (arr[3] != null) {
- bean.setRunConf((String) arr[3]);
+ bean.setRunConfBlob((StringBlob) arr[3]);
}
if (arr[4] != null) {
bean.setNominalTime(DateUtils.toDate((Timestamp) arr[4]));
@@ -86,13 +87,13 @@ public class CoordActionGetForInputCheck
bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[5]));
}
if (arr[6] != null) {
- bean.setActionXml((String) arr[6]);
+ bean.setActionXmlBlob((StringBlob) arr[6]);
}
if (arr[7] != null) {
- bean.setMissingDependencies((String) arr[7]);
+ bean.setMissingDependenciesBlob((StringBlob) arr[7]);
}
if (arr[8] != null) {
- bean.setPushMissingDependencies((String) arr[8]);
+ bean.setPushMissingDependenciesBlob((StringBlob) arr[8]);
}
if (arr[9] != null) {
bean.setTimeOut((Integer) arr[9]);