You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2009/10/15 01:37:10 UTC

svn commit: r825343 - in /ode/branches/APACHE_ODE_1.X: Rakefile dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/GZipDataType.java dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java

Author: boisvert
Date: Wed Oct 14 23:37:10 2009
New Revision: 825343

URL: http://svn.apache.org/viewvc?rev=825343&view=rev
Log:
ODE-682: Compress data stored in the LARGE_DATA table

Added:
    ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/GZipDataType.java
Modified:
    ode/branches/APACHE_ODE_1.X/Rakefile
    ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java

Modified: ode/branches/APACHE_ODE_1.X/Rakefile
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/Rakefile?rev=825343&r1=825342&r2=825343&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/Rakefile (original)
+++ ode/branches/APACHE_ODE_1.X/Rakefile Wed Oct 14 23:37:10 2009
@@ -386,6 +386,15 @@
         process_instance_hbm.insert(process_instance_hbm.index("class=\"org.apache.ode.daohib.bpel.hobj.HProcess\"") - 1, "not-found=\"ignore\" ")
         File.open(process_instance_hbm_file, "w") { |f| f << process_instance_hbm }
       end
+      # add gzip typedef (doclet does not support it correctly)
+      typedef = <<-END
+        <typedef name="gzip" class="org.apache.ode.daohib.bpel.hobj.GZipDataType"/>
+      END
+      Dir[project.path_to("target/classes")+"/**/*.hbm.xml"].select { |f| File.read(f).include? 'type="gzip"' }.each do |hbm|
+        content = File.read(hbm)
+        content.insert(content.index("<class") - 1, typedef)
+        File.open(hbm, "w") { |f| f << content }
+      end
     }
 
     test.with project("bpel-epr"), BACKPORT, COMMONS.collections, COMMONS.lang, HSQLDB,

Added: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/GZipDataType.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/GZipDataType.java?rev=825343&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/GZipDataType.java (added)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/GZipDataType.java Wed Oct 14 23:37:10 2009
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.daohib.bpel.hobj;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Types;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.hibernate.usertype.UserType;
+
+/**
+ * Custom Hibernate datatype that compresses (GZip) byte arrays
+ * to increase performance and save disk space.
+ */
+public class GZipDataType implements UserType {
+    private static final Log log = LogFactory.getLog(GZipDataType.class);
+
+    public static final int[] SQL_TYPES = new int[] { Types.BLOB };
+
+    public static final Class RETURNED_CLASS = new byte[0].getClass();
+
+    /** For backward compatibility with non-zipped data, prefix the gzip stream with a magic sequence */
+    public static final byte[] GZIP_PREFIX = new byte[] { 0x01, 0x02, 0x03, 0x04, 0x03, 0x02, 0x01, 0x00 };
+
+    // Compression statistics
+    private static long _totalBytesBefore = 0;
+    private static long _totalBytesAfter = 0;
+    private static volatile long _lastLogTime = 0;
+    private static final Object STATS_LOCK = new Object();
+
+    private static volatile boolean _compressionEnabled = System.getProperty("org.apache.ode.daohib.bpel.hobj.GZipDataType.enabled", "true").equalsIgnoreCase("true");
+
+    /** Reconstruct an object from the cacheable representation */
+    public Object assemble(Serializable cached, Object owner) {
+        // serializable representation is same
+        return cached;
+    }
+
+    /** Transform the object into its cacheable representation */
+    public Serializable disassemble(Object value) {
+        // as-is
+        return (Serializable) value;
+    }
+
+    /** Return a deep copy of the persistent state */
+    public Object deepCopy(Object value) {
+        if (value == null) return null;
+        return ((byte[]) value).clone();
+    }
+
+    /**  Compare two instances of the class mapped by this type for persistence "equality". */
+    public boolean equals(Object x, Object y) {
+        byte[] buf1 = (byte[]) x;
+        byte[] buf2 = (byte[]) y;
+        if (buf1 == null && buf2 != null) return false;
+        if (buf1 != null && buf2 == null) return false;
+        if (buf1.length != buf2.length) return false;
+        for (int i=0; i<buf1.length; i++) {
+            if (buf1[i] != buf2[i]) return false;
+        }
+        return true;
+    }
+
+    /** Get a hashcode for the instance, consistent with persistence "equality" */
+    public int hashCode(Object x) {
+        if (x == null) return 0;
+        byte[] buf = (byte[]) x;
+        int hash = 0;
+        for (int i=0; i<buf.length; i++) {
+            hash += buf[i];
+        }
+        return hash;
+    }
+
+    /** Are objects of this type mutable? */
+    public boolean isMutable() {
+        return false;
+    }
+
+    /** Retrieve an instance of the mapped class from a JDBC resultset. */
+    public Object nullSafeGet(ResultSet rs, String[] names, Object owner) throws SQLException {
+        if (names.length != 1) throw new IllegalStateException("Expected a single column name instead of "+names.length);
+        byte[] buf = rs.getBytes(names[0]);
+        if (buf == null) {
+            return null;
+        }
+        if (buf.length >= GZIP_PREFIX.length) {
+            boolean gzip = true;
+            for (int i=0; i<GZIP_PREFIX.length; i++) {
+                if (buf[i] != GZIP_PREFIX[i]) {
+                    gzip = false;
+                    break;
+                }
+            }
+            if (gzip) {
+                buf = gunzip(new ByteArrayInputStream(buf, GZIP_PREFIX.length, buf.length-GZIP_PREFIX.length));
+            }
+        }
+        return buf;
+    }
+
+    /** Write an instance of the mapped class to a prepared statement. */
+    public void nullSafeSet(PreparedStatement st, Object value, int index) throws SQLException {
+        byte[] buf = (byte[]) value;
+        synchronized (STATS_LOCK) {
+            if (_totalBytesBefore > Integer.MAX_VALUE) {
+                // prevent overflow - renormalize to percent value
+                _totalBytesAfter = _totalBytesAfter*100/_totalBytesBefore;
+                _totalBytesBefore = 100;
+            }
+            _totalBytesBefore += buf.length;
+        }
+        // only try to zip if we have more than 100 bytes
+        if (buf != null && buf.length > 100 && _compressionEnabled) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream(buf.length);
+            for (int i=0; i<GZIP_PREFIX.length; i++) {
+                baos.write(GZIP_PREFIX[i]);
+            }
+            gzip((byte[]) value, baos);
+            byte[] zipped = baos.toByteArray();
+            // only use zipped representation if we gain 2% or more
+            if (zipped.length*100/buf.length < 99) {
+                buf = zipped;
+            }
+        }
+        synchronized (STATS_LOCK) {
+            _totalBytesAfter += buf.length;
+        }
+        if (log.isDebugEnabled()) {
+            long now = System.currentTimeMillis();
+            if (_lastLogTime+5000 < now) {
+                log.debug("Average compression ratio: "+ (_totalBytesAfter*100/_totalBytesBefore)+"%");
+                _lastLogTime = now;
+            }
+        }
+        st.setBytes(index, buf);
+    }
+
+    /** During merge, replace the existing (target) value in the entity we are
+     *  merging to with a new (original) value from the detached entity we are merging.
+     */
+    public Object replace(Object original, Object target, Object owner) {
+        return original;
+    }
+
+    /** The class returned by nullSafeGet(). */
+    public Class returnedClass() {
+        return RETURNED_CLASS;
+    }
+
+    /** Return the SQL type codes for the columns mapped by this type. */
+    public int[] sqlTypes() {
+        return SQL_TYPES;
+    }
+
+    /**
+     * Compress (using gzip algorithm) a byte array into an output stream.
+     */
+    public static void gzip(byte[] content, OutputStream out) {
+        try {
+            GZIPOutputStream zip = new GZIPOutputStream(out);
+            zip.write(content, 0, content.length);
+            zip.finish();
+            zip.close();
+        } catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /**
+     * Decompress (using gzip algorithm) a byte array.
+     */
+    public static byte[] gunzip(InputStream input) {
+        try {
+            GZIPInputStream unzip = new GZIPInputStream(input);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream(32*1024);
+            byte[] buf = new byte[4096];
+            int len;
+            while ((len = unzip.read(buf)) > 0) {
+                baos.write(buf, 0, len);
+            }
+            unzip.close();
+            return baos.toByteArray();
+        } catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+}

Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java?rev=825343&r1=825342&r2=825343&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java Wed Oct 14 23:37:10 2009
@@ -23,7 +23,11 @@
  * Used to store large data sets into a single table. When an HObject
  * instance needs to store as part of its state large binary or text
  * data, a reference to an instance of this class must be created.
+ *
  * @hibernate.class table="LARGE_DATA"
+ *
+ * @hibernate.typedef name="gzip" class="org.apache.ode.daohib.bpel.hobj.GZipDataType"
+ *
  * @hibernate.query name="SELECT_ACTIVITY_RECOVERY_LDATA_IDS_BY_INSTANCES" query="select a.details.id from HActivityRecovery as a where a.instance in (:instances) and a.details is not null"
  * @hibernate.query name="SELECT_JACOB_LDATA_IDS_BY_INSTANCES" query="select i.jacobState.id from HProcessInstance as i where i in (:instances) and i.jacobState is not null"
  * @hibernate.query name="SELECT_MESSAGE_LDATA_IDS_BY_INSTANCES_1" query="select m.messageData.id from HMessage m where m.messageExchange.instance in (:instances) and m.messageData is not null"
@@ -57,10 +61,10 @@
     public final static String SELECT_PARTNER_LINK_LDATA_IDS_BY_INSTANCES_1 = "SELECT_PARTNER_LINK_LDATA_IDS_BY_INSTANCES_1";
     public final static String SELECT_PARTNER_LINK_LDATA_IDS_BY_INSTANCES_2 = "SELECT_PARTNER_LINK_LDATA_IDS_BY_INSTANCES_2";
     public final static String SELECT_FAULT_LDATA_IDS_BY_INSTANCE_IDS = "SELECT_FAULT_LDATA_IDS_BY_INSTANCE_IDS";
-    
+
     public final static String SELECT_MESSAGE_LDATA_IDS_BY_MEX_1 = "SELECT_MESSAGE_LDATA_IDS_BY_MEX_1";
     public final static String SELECT_MESSAGE_LDATA_IDS_BY_MEX_2 = "SELECT_MESSAGE_LDATA_IDS_BY_MEX_2";
-	
+
     private byte[] binary = null;
 
     public HLargeData() {
@@ -78,14 +82,14 @@
     }
 
     /**
-     * @hibernate.property type="binary" length="2G"
+     * @hibernate.property type="gzip"
      *
      * @hibernate.column name="BIN_DATA" sql-type="blob(2G)"
      */
     public byte[] getBinary() {
         return binary;
     }
-                                                                    
+
     public void setBinary(byte[] binary) {
         this.binary = binary;
     }