You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2013/02/14 23:55:06 UTC
svn commit: r1446378 [2/3] - in
/uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src:
main/java/org/apache/uima/cas/impl/ main/java/org/apache/uima/util/impl/
test/java/org/apache/uima/cas/impl/
Added: uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java?rev=1446378&view=auto
==============================================================================
--- uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java (added)
+++ uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java Thu Feb 14 22:55:06 2013
@@ -0,0 +1,2991 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.cas.impl;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.uima.cas.AbstractCas;
+import org.apache.uima.cas.CASRuntimeException;
+import org.apache.uima.cas.impl.SlotKinds.SlotKind;
+import org.apache.uima.cas.impl.TypeSystemImpl.TypeInfo;
+import org.apache.uima.internal.util.IntListIterator;
+import org.apache.uima.internal.util.IntPointerIterator;
+import org.apache.uima.internal.util.IntVector;
+import org.apache.uima.internal.util.rb_trees.IntArrayRBT;
+import org.apache.uima.internal.util.rb_trees.IntRedBlackTree;
+import org.apache.uima.jcas.JCas;
+import org.apache.uima.util.impl.DataIO;
+import org.apache.uima.util.impl.OptimizeStrings;
+import org.apache.uima.util.impl.SerializationMeasures;
+
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.*;
+
+/**
+ * User callable serialization and deserialization of the CAS in a compressed Binary Format
+ *
+ * This serializes/deserializes the state of the CAS. It has the capability to map type systems,
+ * so the sending and receiving type systems do not have to be the same.
+ * - types and features are matched by name, and features must have the same range (slot kind)
+ * - types and/or features in one type system not in the other are skipped over
+ *
+ * Header specifies to reader the format, and the compression level.
+ *
+ * How to Serialize:
+ *
+ * 1) create an instance of this class
+ * 2) call serialize() to serialize the CAS
+ * 3) If doing serialization to a target from which you expect to receive back a delta CAS,
+ * keep this object and reuse it for deserializing the delta CAS.
+ *
+ * Otherwise, you cannot reuse this object.
+ *
+ * TypeSystemImpl objects are lazily augmented by customized TypeInfo instances for each type encountered in
+ * serializing or deserializing. These are preserved for future calls, so their setup / initialization is only
+ * needed the first time.
+ *
+ * TypeSystemImpl objects are also lazily augmented by typeMappers for individual different target typesystems;
+ * these too are preserved and reused on future calls.
+ *
+ * Compressed Binary CASes are designed to be "self-describing" -
+ * The format of the compressed binary CAS, including version info,
+ * is inserted at the beginning so that a proper deserialization method can be automatically chosen.
+ *
+ * Compressed Binary format supports type system mapping. Types in the source which are not in the target
+ * (or vice versa) are omitted. Types with "extra" features have their extra features omitted
+ * (or on deserialization, they are set to their default value - null, or 0, etc.).
+ *
+ * How to Deserialize:
+ *
+ * 1) get an appropriate CAS to deserialize into. For delta CAS, it does not have to be empty, but it must
+ * be the originating CAS from which the delta was produced.
+ * 2) call CASImpl: cas.reinit(inputStream) This is the existing method
+ * for binary deserialization, and it now handles this compressed version, too.
+ * Delta cas is also supported. This form requires both source and target type systems to be exactly the same.
+ *
+ * An alternative interface for Deserialization with type system mapping:
+ * 1) get an appropriate CAS to deserialize into (as above).
+ * 2) create (or reuse, if delta deserializing) an instance of this class -> xxx
+ * 3) call xxx.deserialize(receivingCas, inputStream, targetTypeSystemImpl)
+ * where the targetTypeSystem is the type system of the cas being deserialized
+ *
+ * Compression/Decompression
+ * Works in two stages:
+ * application of Zip/Unzip to particular sub-collections of CAS data,
+ * grouped according to similar data distribution
+ * collection of like kinds of data (to make the zipping more effective)
+ * There can be up to ~20 of these collections, such as
+ * control info, float-exponents, string chars
+ * Deserialization:
+ * Read all bytes,
+ * create separate ByteArrayInputStreams for each segment
+ * create appropriate unzip data input streams for these
+ *
+ * API design
+ * Slow but expensive data:
+ * extra type system info - lazily created and added to shared TypeSystemImpl object
+ * set up per type actually referenced
+ * mapper for type system - lazily created and added to shared TypeSystemImpl object
+ * in identity-map cache (size limit = 10 per source type system?) - key is target typesystemimpl.
+ * Defaulting:
+ * flags: doMeasurements, compressLevel, CompressStrategy
+ * Per serialize call: cas, output, [target ts], [mark for delta]
+ * Per deserialize call: cas, input, [target ts]
+ *
+ * CASImpl has instance method with defaulting args for serialization.
+ * CASImpl has reinit which works with compressed binary serialization objects
+ * if no type mapping
+ * If type mapping, (new BinaryCasSerDes6().deserialize(in-steam, [targetTypeSystem])
+ *
+ * Use Cases, filtering and delta
+ * **************************************************************************
+ * * (de)serialize * filter? * delta? * Use case
+ * **************************************************************************
+ * * serialize * N * N * Saving a Cas,
+ * * * * * sending Cas to svc with identical ts
+ * **************************************************************************
+ * * serialize * Y * N * sending Cas to svc with
+ * * * * * different ts (a subset)
+ * **************************************************************************
+ * * serialize * N * Y * returning Cas to client
+ * * * * * (?? saving just a delta to disk??)
+ * **************************************************************************
+ * * serialize * Y * Y * NOT SUPPORTED (not needed)
+ * **************************************************************************
+ * * deserialize * N * N * reading/(receiving) CAS, identical TS
+ * **************************************************************************
+ * * deserialize * Y * N * reading/(receiving) CAS, different TS
+ * **************************************************************************
+ * * deserialize * N * Y * receiving CAS, identical TS,
+ * **************************************************************************
+ * * deserialize * Y * Y * receiving CAS, different TS (tgt a feature subset)
+ * **************************************************************************
+ */
+public class BinaryCasSerDes6 {
+
+ /**
+ * Version of the serializer/deserializer, used to allow deserialization of
+ * older versions
+ *
+ * Version 0 - initial SVN checkin
+ * Version 1 - changes to support CasTypeSystemMapper
+ */
+ private static final int VERSION = 1;
+
+ private static final long DBL_1 = Double.doubleToLongBits(1D);
+
+ /**
+ * Compression alternatives
+ */
+
+ public enum CompressLevel {
+ None( Deflater.NO_COMPRESSION),
+ Fast( Deflater.BEST_SPEED),
+ Default(Deflater.DEFAULT_COMPRESSION),
+ Best( Deflater.BEST_COMPRESSION),
+ ;
+ final public int lvl;
+ CompressLevel(int lvl) {
+ this.lvl = lvl;
+ }
+ }
+
+ public enum CompressStrat {
+ Default( Deflater.DEFAULT_STRATEGY),
+ Filtered( Deflater.FILTERED),
+ HuffmanOnly( Deflater.HUFFMAN_ONLY),
+ ;
+ final public int strat;
+ CompressStrat(int strat) {
+ this.strat = strat;
+ }
+ }
+ /**
+ * Info reused for multiple serializations of same cas to multiple targets, or
+ * for deserializing with a delta cas.
+ * Reachable FSs and Sequence maps
+ * Reuseable in serialization, and for deserialization of delta cas
+ */
+ public static class ReuseInfo {
+ final private IntArrayRBT foundFSs; // ordered set of FSs found in indexes or linked from other found FSs
+ final private CasSeqAddrMaps fsStartIndexes;
+ final private TypeSystemImpl tgtTs;
+
+ private ReuseInfo(
+ IntArrayRBT foundFSs,
+ CasSeqAddrMaps fsStartIndexes,
+ TypeSystemImpl tgtTs) {
+ this.foundFSs = foundFSs;
+ this.fsStartIndexes = fsStartIndexes;
+ this.tgtTs = tgtTs;
+ }
+ }
+
+ public ReuseInfo getReuseInfo() {
+ return new ReuseInfo(foundFSs, fsStartIndexes, tgtTs);
+ }
+
+ // speedups - ints for SlotKind ordinals
+ final private static int arrayLength_i = Slot_ArrayLength.ordinal();
+ final private static int heapRef_i = Slot_HeapRef.ordinal();
+ final private static int int_i = Slot_Int.ordinal();
+ final private static int byte_i = Slot_Byte.ordinal();
+ final private static int short_i = Slot_Short.ordinal();
+ final private static int typeCode_i = Slot_TypeCode.ordinal();
+ final private static int strOffset_i = Slot_StrOffset.ordinal();
+ final private static int strLength_i = Slot_StrLength.ordinal();
+ final private static int long_High_i = Slot_Long_High.ordinal();
+ final private static int long_Low_i = Slot_Long_Low.ordinal();
+ final private static int float_Mantissa_Sign_i = Slot_Float_Mantissa_Sign.ordinal();
+ final private static int float_Exponent_i = Slot_Float_Exponent.ordinal();
+ final private static int double_Mantissa_Sign_i = Slot_Double_Mantissa_Sign.ordinal();
+ final private static int double_Exponent_i = Slot_Double_Exponent.ordinal();
+ final private static int fsIndexes_i = Slot_FsIndexes.ordinal();
+ final private static int strChars_i = Slot_StrChars.ordinal();
+ final private static int control_i = Slot_Control.ordinal();
+ final private static int strSeg_i = Slot_StrSeg.ordinal();
+
+ /**
+ * Things set up for one instance of this class
+ */
+ final private TypeSystemImpl ts;
+ final private CompressLevel compressLevel;
+ final private CompressStrat compressStrategy;
+
+ /**
+ * Things that are used by common routines among serialization and deserialization
+ */
+ private boolean isTypeMappingCmn;
+ private CasTypeSystemMapper typeMapperCmn;
+
+ /*****************************************************
+ * Things for both serialization and Deserialization
+ *****************************************************/
+ final private CASImpl cas; // cas being serialized
+ private int[] heap; // main heap, can't be final because grow replaces it
+ final private StringHeap stringHeapObj;
+ final private LongHeap longHeapObj;
+ final private ShortHeap shortHeapObj;
+ final private ByteHeap byteHeapObj;
+
+ private int heapStart;
+ private int heapEnd; // set when deserializing
+ private int totalMappedHeapSize = 0; // heapEnd - heapStart, but with FS that don't exist in the target type system deleted
+
+ final private boolean isSerializingDelta; // if true, there is a marker indicating the start spot(s)
+ private boolean isDelta;
+ private boolean isReadingDelta;
+ final private MarkerImpl mark; // the mark to serialize from
+
+ final private CasSeqAddrMaps fsStartIndexes;
+ final private boolean doMeasurements; // if true, doing measurements
+
+ private OptimizeStrings os;
+ private boolean only1CommonString; // true if only one common string
+
+ final private TypeSystemImpl tgtTs;
+
+ private TypeInfo typeInfo; // type info for the current type being serialized/deserialized
+ final private CasTypeSystemMapper typeMapper;
+ final private boolean isTypeMapping;
+
+ final private int[] iPrevHeapArray; // index of previous instance of this typecode in heap, by typecode
+ private int iPrevHeap; // 0 or heap addr of previous instance of current type
+
+ private IntArrayRBT foundFSs; // ordered set of FSs found in indexes or linked from other found FSs
+// final private int[] typeCodeHisto = new int[ts.getTypeArraySize()];
+
+ /*********************************
+ * Things for just serialization
+ *********************************/
+ private DataOutputStream serializedOut; // where to write out the serialized result
+
+ final private SerializationMeasures sm; // null or serialization measurements
+ final private ByteArrayOutputStream[] baosZipSources = new ByteArrayOutputStream[NBR_SLOT_KIND_ZIP_STREAMS]; // lazily created, indexed by SlotKind.i
+ final private DataOutputStream[] dosZipSources = new DataOutputStream[NBR_SLOT_KIND_ZIP_STREAMS]; // lazily created, indexed by SlotKind.i
+
+ final private int[] estimatedZipSize = new int[NBR_SLOT_KIND_ZIP_STREAMS]; // one entry for each output stream kind
+
+ // speedups
+
+ // any use of these means caller handles measurement
+ // some of these are never used, because the current impl
+ // is using the _i form to get measurements done
+ private DataOutputStream arrayLength_dos;
+ private DataOutputStream heapRef_dos;
+ private DataOutputStream int_dos;
+ private DataOutputStream byte_dos;
+ private DataOutputStream short_dos;
+ private DataOutputStream typeCode_dos;
+ private DataOutputStream strOffset_dos;
+ private DataOutputStream strLength_dos;
+ private DataOutputStream long_High_dos;
+ private DataOutputStream long_Low_dos;
+ private DataOutputStream float_Mantissa_Sign_dos;
+ private DataOutputStream float_Exponent_dos;
+ private DataOutputStream double_Mantissa_Sign_dos;
+ private DataOutputStream double_Exponent_dos;
+ private DataOutputStream fsIndexes_dos;
+ private DataOutputStream strChars_dos;
+ private DataOutputStream control_dos;
+ private DataOutputStream strSeg_dos;
+
+ /**********************************
+ * Things for just deserialization
+ **********************************/
+
+ private DataInputStream deserIn;
+ private int version;
+
+ final private DataInputStream[] dataInputs = new DataInputStream[NBR_SLOT_KIND_ZIP_STREAMS];
+ final private Inflater[] inflaters = new Inflater[NBR_SLOT_KIND_ZIP_STREAMS];
+
+ private IntVector fixupsNeeded; // for deserialization, the "fixups" for relative heap refs needed
+ private int stringTableOffset;
+
+ /**
+ * These indexes remember sharable common values in aux heaps
+ * Values must be in aux heap, but not part of arrays there
+ * so that rules out boolean, byte, and shorts
+ */
+ private int longZeroIndex = -1; // also used for double 0 index
+ private int double1Index = -1;
+
+ private String[] readCommonString;
+
+ // speedups
+
+ private DataInputStream arrayLength_dis;
+ private DataInputStream heapRef_dis;
+ private DataInputStream int_dis;
+ private DataInputStream byte_dis;
+ private DataInputStream short_dis;
+ private DataInputStream typeCode_dis;
+ private DataInputStream strOffset_dis;
+ private DataInputStream strLength_dis;
+ private DataInputStream long_High_dis;
+ private DataInputStream long_Low_dis;
+ private DataInputStream float_Mantissa_Sign_dis;
+ private DataInputStream float_Exponent_dis;
+ private DataInputStream double_Mantissa_Sign_dis;
+ private DataInputStream double_Exponent_dis;
+ private DataInputStream fsIndexes_dis;
+ private DataInputStream strChars_dis;
+ private DataInputStream control_dis;
+ private DataInputStream strSeg_dis;
+
+
+ /**
+ *
+ * @param ts Type System (the source type system)
+ * @param doMeasurements true if measurements should be collected
+ * @param compressLevel
+ * @param compressStrategy
+ */
+ public BinaryCasSerDes6(
+ AbstractCas aCas,
+ MarkerImpl mark,
+ TypeSystemImpl tgtTs,
+ ReuseInfo rfs,
+ boolean doMeasurements,
+ CompressLevel compressLevel,
+ CompressStrat compressStrategy) {
+ cas = ((CASImpl) ((aCas instanceof JCas) ? ((JCas)aCas).getCas(): aCas)).getBaseCAS();
+
+ this.ts = cas.getTypeSystemImpl();
+ this.mark = mark;
+ if (null != mark && !mark.isValid() ) {
+ throw new CASRuntimeException(
+ CASRuntimeException.INVALID_MARKER, new String[] { "Invalid Marker." });
+ }
+
+ this.doMeasurements = doMeasurements;
+ this.sm = doMeasurements ? new SerializationMeasures() : null;
+
+ isDelta = isSerializingDelta = (mark != null);
+ doMeasurements = (sm != null);
+ typeMapperCmn = typeMapper = ts.getTypeSystemMapper(tgtTs);
+ isTypeMappingCmn = isTypeMapping = (null != typeMapper);
+
+ heap = cas.getHeap().heap;
+ heapEnd = cas.getHeap().getCellsUsed();
+ heapStart = isSerializingDelta ? mark.getNextFSId() : 0;
+
+ stringHeapObj = cas.getStringHeap();
+ longHeapObj = cas.getLongHeap();
+ shortHeapObj = cas.getShortHeap();
+ byteHeapObj = cas.getByteHeap();
+
+ iPrevHeapArray = new int[ts.getTypeArraySize()];
+
+ this.compressLevel = compressLevel;
+ this.compressStrategy = compressStrategy;
+ if (null != rfs) {
+ foundFSs = rfs.foundFSs;
+ fsStartIndexes = rfs.fsStartIndexes;
+ this.tgtTs = rfs.tgtTs;
+ } else {
+ fsStartIndexes = new CasSeqAddrMaps();
+ this.tgtTs = tgtTs;
+ }
+ }
+
+ public BinaryCasSerDes6(AbstractCas cas) {
+ this(cas, null, null, null, false, CompressLevel.Default, CompressStrat.Default);
+ }
+
+ public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark) {
+ this(cas, mark, null, null, false, CompressLevel.Default, CompressStrat.Default);
+ }
+
+ public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark, TypeSystemImpl tgtTs) {
+ this(cas, mark, tgtTs, null, false, CompressLevel.Default, CompressStrat.Default);
+ }
+
+ public BinaryCasSerDes6(AbstractCas cas, ReuseInfo rfs) {
+ this(cas, null, null, rfs, false, CompressLevel.Default, CompressStrat.Default);
+ }
+
+ /*********************************************************************************************
+ * S e r i a l i z e r Class for sharing variables among routines
+ * Class instantiated once per serialization
+ * Multiple serializations in parallel supported, with multiple instances of this
+ *********************************************************************************************/
+
+
+ /*************************************************************************************
+ * S E R I A L I Z E
+ * @return null or serialization measurements (depending on setting of doMeasurements)
+ * @throws IOException
+ *************************************************************************************/
+ public SerializationMeasures serialize(Object out) throws IOException {
+ if (isSerializingDelta && (tgtTs != null)) {
+ throw new UnsupportedOperationException("Can't do Delta Serialization with different target TS");
+ }
+ setupOutputStreams(out);
+
+ if (doMeasurements) {
+ System.out.println(printCasInfo(cas));
+ sm.origAuxBytes = cas.getByteHeap().getSize();
+ sm.origAuxShorts = cas.getShortHeap().getSize() * 2;
+ sm.origAuxLongs = cas.getLongHeap().getSize() * 8;
+ sm.totalTime = System.currentTimeMillis();
+ }
+
+ writeHeader();
+ os = new OptimizeStrings(doMeasurements);
+
+ /******************************************************************
+ * Find all FSs to be serialized via the indexes
+ * including those FSs referenced
+ ******************************************************************/
+
+ if (foundFSs == null) {
+ processIndexedFeatureStructures(cas, false /* compute ref'd FSs, no write */);
+ }
+
+ /***************************
+ * Prepare to walk main heap
+ * We prescan the main heap and
+ * 1) identify any types that should be skipped
+ * building a source and target fsStartIndexes table
+ * 2) add all strings to the string table,
+ * for strings above the mark
+ ***************************/
+
+ // scan thru all fs and save their offsets in the heap
+ // to allow conversion from addr to sequential fs numbers
+ // Also, compute sequential maps for non-equal type systems
+ // As a side effect, also add all strings that are included
+ // in the target type system to the set to be optimized.
+ totalMappedHeapSize = initFsStartIndexes();
+ if (heapStart == 0) {
+ totalMappedHeapSize++; // include the null at the start
+ heapStart = 1; // slot 0 not serialized, it's null / 0
+ }
+
+ /**************************
+ * Strings
+ **************************/
+
+ os.optimize();
+ writeStringInfo();
+
+ /***************************
+ * Prepare to walk main heap
+ ***************************/
+ writeVnumber(control_dos, totalMappedHeapSize);
+ if (doMeasurements) {
+ sm.statDetails[Slot_MainHeap.ordinal()].original = (1 + heapEnd - heapStart) * 4;
+ }
+
+ Arrays.fill(iPrevHeapArray, 0);
+
+ /***************************
+ * walk main heap
+ ***************************/
+
+ final IntListIterator foundFSsIterator = foundFSs.iterator();
+ int iHeap;
+ while (foundFSsIterator.hasNext()) {
+ iHeap = foundFSsIterator.next();
+// for (int iHeap = heapStart; iHeap < heapEnd; iHeap += incrToNextFs(heap, iHeap, typeInfo)) {
+ final int tCode = heap[iHeap]; // get type code
+ final int mappedTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
+ if (mappedTypeCode == 0) { // means no corresponding type in target system
+ continue;
+ }
+
+ typeInfo = ts.getTypeInfo(tCode);
+ iPrevHeap = iPrevHeapArray[tCode];
+
+ writeVnumber(typeCode_dos, mappedTypeCode);
+
+ if (typeInfo.isHeapStoredArray) {
+ serializeHeapStoredArray(iHeap);
+ } else if (typeInfo.isArray) {
+ serializeNonHeapStoredArray(iHeap);
+ } else {
+ if (isTypeMapping) {
+ // Serialize out in the order the features are in the target
+ final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode);
+ for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
+ final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1; // add one for origin 1
+ if (featOffsetInSrc == 0) {
+ throw new RuntimeException(); // never happen because for serialization, target is never a superset of features of src
+ }
+ serializeByKind(iHeap, featOffsetInSrc);
+ }
+ } else {
+ for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+ serializeByKind(iHeap, i);
+ }
+ }
+ }
+
+ iPrevHeapArray[tCode] = iHeap; // make this one the "prev" one for subsequent testing
+ if (doMeasurements) {
+ sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(tCode));
+ sm.mainHeapFSs ++;
+ }
+ } // end of heap walk
+
+ processIndexedFeatureStructures(cas, true /* pass 2 */);
+
+ if (isSerializingDelta) {
+ (new SerializeModifiedFSs()).serializeModifiedFSs();
+ }
+
+ collectAndZip();
+
+ if (doMeasurements) {
+ sm.totalTime = System.currentTimeMillis() - sm.totalTime;
+ }
+ return sm;
+ }
+
+ private void serializeHeapStoredArray(int iHeap) throws IOException {
+ final int length = serializeArrayLength(iHeap);
+ // output values
+ // special case 0 and 1st value
+ if (length == 0) {
+ return;
+ }
+ SlotKind arrayElementKind = typeInfo.slotKinds[1];
+ final int endi = iHeap + length + 2;
+ switch (arrayElementKind) {
+ case Slot_HeapRef: case Slot_Int: case Slot_Short:
+ {
+ int prev = (iPrevHeap == 0) ? 0 :
+ (heap[iPrevHeap + 1] == 0) ? 0 : // prev length is 0
+ heap[iPrevHeap + 2]; // use prev array 1st element
+ for (int i = iHeap + 2; i < endi; i++) {
+ prev = writeIntOrHeapRef(arrayElementKind.ordinal(), i, prev);
+ }
+ }
+ break;
+ case Slot_Float:
+ for (int i = iHeap + 2; i < endi; i++) {
+ writeFloat(heap[i]);
+ }
+ break;
+ case Slot_StrRef:
+ for (int i = iHeap + 2; i < endi; i++) {
+ writeString(stringHeapObj.getStringForCode(heap[i]));
+ }
+ break;
+
+ default: throw new RuntimeException("internal error");
+ } // end of switch
+ }
+
+ private int writeIntOrHeapRef(int kind, int index, int prev) throws IOException {
+ final int v = heap[index];
+ writeDiff(kind, v, prev);
+ return v;
+ }
+
+ private long writeLongFromHeapIndex(int index, long prev) throws IOException {
+ final long v = longHeapObj.getHeapValue(heap[index]);
+ writeLong(v, prev);
+ return v;
+ }
+
+ private void serializeNonHeapStoredArray(int iHeap) throws IOException {
+ final int length = serializeArrayLength(iHeap);
+ if (length == 0) {
+ return;
+ }
+ SlotKind refKind = typeInfo.getSlotKind(2);
+ switch (refKind) {
+ case Slot_BooleanRef: case Slot_ByteRef:
+ writeFromByteArray(refKind, heap[iHeap + 2], length);
+ if (doMeasurements) {
+ sm.statDetails[byte_i].incr(1);
+ sm.origAuxByteArrayRefs += 4;
+ }
+ break;
+ case Slot_ShortRef:
+ writeFromShortArray(heap[iHeap + 2], length);
+ if (doMeasurements) {
+ sm.origAuxShortArrayRefs += 4;
+ }
+ break;
+ case Slot_LongRef: case Slot_DoubleRef:
+ writeFromLongArray(refKind, heap[iHeap + 2], length);
+ if (doMeasurements) {
+ sm.origAuxLongArrayRefs += 4;
+ }
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+ private void serializeByKind(int iHeap, int offset) throws IOException {
+ SlotKind kind = typeInfo.getSlotKind(offset);
+ switch (kind) {
+ //Slot_Int, Slot_Float, Slot_Boolean, Slot_Byte, Slot_Short
+ case Slot_Int: case Slot_Short: case Slot_HeapRef:
+ serializeDiffWithPrevTypeSlot(kind, iHeap, offset);
+ break;
+ case Slot_Float:
+ writeFloat(heap[iHeap + offset]);
+ break;
+ case Slot_Boolean: case Slot_Byte:
+ byte_dos.write(heap[iHeap + offset]);
+ break;
+ case Slot_StrRef:
+ writeString(stringHeapObj.getStringForCode(heap[iHeap + offset]));
+ break;
+ case Slot_LongRef:
+ writeLongFromHeapIndex(iHeap + offset,
+ (iPrevHeap == 0) ?
+ 0L :
+ longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+ break;
+ case Slot_DoubleRef:
+ writeDouble(longHeapObj.getHeapValue(heap[iHeap + offset]));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ } // end of switch
+ }
+
+ private int serializeArrayLength(int iHeap) throws IOException {
+ final int length = heap[iHeap + 1];
+ writeVnumber(arrayLength_i, length);
+ return length;
+ }
+
+ private void serializeDiffWithPrevTypeSlot(SlotKind kind, int iHeap, int offset) throws IOException {
+ int prev = (iPrevHeap == 0) ? 0 : heap[iPrevHeap + offset];
+ writeDiff(kind.ordinal(), heap[iHeap + offset], prev);
+ }
+
+ /**
+ * Method:
+ * write with deflation into a single byte array stream
+ * skip if not worth deflating
+ * skip the Slot_Control stream
+ * record in the Slot_Control stream, for each deflated stream:
+ * the Slot index
+ * the number of compressed bytes
+ * the number of uncompressed bytes
+ * add to header:
+ * nbr of compressed entries
+ * the Slot_Control stream size
+ * the Slot_Control stream
+ * all the zipped streams
+ *
+ * @throws IOException
+ */
+ private void collectAndZip() throws IOException {
+ ByteArrayOutputStream baosZipped = new ByteArrayOutputStream(4096);
+ Deflater deflater = new Deflater(compressLevel.lvl, true);
+ deflater.setStrategy(compressStrategy.strat);
+ int nbrEntries = 0;
+
+ List<Integer> idxAndLen = new ArrayList<Integer>();
+
+ for (int i = 0; i < baosZipSources.length; i++) {
+ ByteArrayOutputStream baos = baosZipSources[i];
+ if (baos != null) {
+ nbrEntries ++;
+ dosZipSources[i].close();
+ long startTime = System.currentTimeMillis();
+ int zipBufSize = Math.max(1024, baos.size() / 100);
+ deflater.reset();
+ DeflaterOutputStream cds = new DeflaterOutputStream(baosZipped, deflater, zipBufSize);
+ baos.writeTo(cds);
+ cds.close();
+ idxAndLen.add(i);
+ if (doMeasurements) {
+ idxAndLen.add((int)(sm.statDetails[i].afterZip = deflater.getBytesWritten()));
+ idxAndLen.add((int)(sm.statDetails[i].beforeZip = deflater.getBytesRead()));
+ sm.statDetails[i].zipTime = System.currentTimeMillis() - startTime;
+ } else {
+ idxAndLen.add((int)deflater.getBytesWritten());
+ idxAndLen.add((int)deflater.getBytesRead());
+ }
+ }
+ }
+ serializedOut.writeInt(nbrEntries); // write number of entries
+ for (int i = 0; i < idxAndLen.size();) {
+ serializedOut.write(idxAndLen.get(i++));
+ serializedOut.writeInt(idxAndLen.get(i++));
+ serializedOut.writeInt(idxAndLen.get(i++));
+ }
+ baosZipped.writeTo(serializedOut); // write Compressed info
+ }
+
+ private void writeLong(long v, long prev) throws IOException {
+ writeDiff(long_High_i, (int)(v >>> 32), (int)(prev >>> 32));
+ writeDiff(long_Low_i, (int)v, (int)prev);
+ }
+
+ /**
+ * String encoding
+ * Length = 0 - used for null, no offset written
+ * Length = 1 - used for "", no offset written
+ * Length > 0 (subtract 1): used for actual string length
+ *
+ * Length < 0 - use (-length) as slot index (minimum is 1, slot 0 is NULL)
+ *
+ * For length > 0, write also the offset.
+ *
+ */
+ private void writeString(final String s) throws IOException {
+ if (null == s) {
+ writeVnumber(strLength_dos, 0);
+ if (doMeasurements) {
+ sm.statDetails[strLength_i].incr(1);
+ }
+ return;
+ }
+
+ int indexOrSeq = os.getIndexOrSeqIndex(s);
+ if (indexOrSeq < 0) {
+ final int v = encodeIntSign(indexOrSeq);
+ writeVnumber(strLength_dos, v);
+ if (doMeasurements) {
+ sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(v));
+ }
+ return;
+ }
+
+ if (s.length() == 0) {
+ writeVnumber(strLength_dos, encodeIntSign(1));
+ if (doMeasurements) {
+ sm.statDetails[strLength_i].incr(1);
+ }
+ return;
+ }
+
+ if (s.length() == Integer.MAX_VALUE) {
+ throw new RuntimeException("Cannot serialize string of Integer.MAX_VALUE length - too large.");
+ }
+
+ final int offset = os.getOffset(indexOrSeq);
+ final int length = encodeIntSign(s.length() + 1); // all lengths sign encoded because of above
+ writeVnumber(strOffset_dos, offset);
+ writeVnumber(strLength_dos, length);
+ if (doMeasurements) {
+ sm.statDetails[strOffset_i].incr(DataIO.lengthVnumber(offset));
+ sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(length));
+ }
+ if (!only1CommonString) {
+ final int csi = os.getCommonStringIndex(indexOrSeq);
+ writeVnumber(strSeg_dos, csi);
+ if (doMeasurements) {
+ sm.statDetails[strSeg_i].incr(DataIO.lengthVnumber(csi));
+ }
+ }
+ }
+
+ /**
+ * Need to support NAN sets,
+ * 0x7fc.... for NAN
+ * 0xff8.... for NAN, negative infinity
+ * 0x7f8 for NAN, positive infinity
+ *
+ * Because 0 occurs frequently, we reserve
+ * exp of 0 for the value 0
+ *
+ */
+
+ private void writeFloat(int raw) throws IOException {
+ if (raw == 0) {
+ writeUnsignedByte(float_Exponent_dos, 0);
+ if (doMeasurements) {
+ sm.statDetails[float_Exponent_i].incr(1);
+ }
+ return;
+ }
+
+ final int exponent = ((raw >>> 23) & 0xff) + 1; // because we reserve 0, see above
+ final int revMants = Integer.reverse((raw & 0x007fffff) << 9);
+ final int mants = (revMants << 1) + ((raw < 0) ? 1 : 0);
+ writeVnumber(float_Exponent_dos, exponent);
+ writeVnumber(float_Mantissa_Sign_dos, mants);
+ if (doMeasurements) {
+ sm.statDetails[float_Exponent_i].incr(DataIO.lengthVnumber(exponent));
+ sm.statDetails[float_Mantissa_Sign_i].incr(DataIO.lengthVnumber(mants));
+ }
+ }
+
+ private void writeVnumber(int kind, int v) throws IOException {
+ DataIO.writeVnumber(dosZipSources[kind], v);
+ if (doMeasurements) {
+ sm.statDetails[kind].incr(DataIO.lengthVnumber(v));
+ }
+ }
+
+ private void writeVnumber(int kind, long v) throws IOException {
+ DataIO.writeVnumber(dosZipSources[kind], v);
+ if (doMeasurements) {
+ sm.statDetails[kind].incr(DataIO.lengthVnumber(v));
+ }
+ }
+
+ // this version doesn't do measurements, caller needs to do it
+ private void writeVnumber(DataOutputStream s, int v) throws IOException {
+ DataIO.writeVnumber(s, v);
+ }
+
+ // this version doesn't do measurements, caller needs to do it
+ private void writeVnumber(DataOutputStream s, long v) throws IOException {
+ DataIO.writeVnumber(s, v);
+ }
+
+ // this version doesn't do measurements, caller needs to do it
+ private void writeUnsignedByte(DataOutputStream s, int v) throws IOException {
+ s.write(v);
+ }
+
+ private void writeDouble(long raw) throws IOException {
+ if (raw == 0L) {
+ writeVnumber(double_Exponent_dos, 0);
+ if (doMeasurements) {
+ sm.statDetails[double_Exponent_i].incr(1);
+ }
+ return;
+ }
+ int exponent = (int)((raw >>> 52) & 0x7ff);
+ exponent = exponent - 1023; // rebase so 1.0 = 0
+ if (exponent >= 0) {
+ exponent ++; // skip "0", used above for 0 value
+ }
+ exponent = encodeIntSign(exponent);
+ final long revMants = Long.reverse((raw & 0x000fffffffffffffL) << 12);
+ final long mants = (revMants << 1) + ((raw < 0) ? 1 : 0);
+ writeVnumber(double_Exponent_dos, exponent);
+ writeVnumber(double_Mantissa_Sign_dos, mants);
+ if (doMeasurements) {
+ sm.statDetails[double_Exponent_i].incr(DataIO.lengthVnumber(exponent));
+ sm.statDetails[double_Mantissa_Sign_i].incr(DataIO.lengthVnumber(mants));
+ }
+ }
+
+ private int encodeIntSign(int v) {
+ if (v < 0) {
+ return ((-v) << 1) | 1;
+ }
+ return (v << 1);
+ }
+
+ /**
+ * Encoding:
+ * bit 6 = sign: 1 = negative
+ * bit 7 = delta: 1 = delta
+ * @param kind
+ * @param i runs from iHeap + 3 to end of array
+ * @throws IOException
+ */
+ private void writeDiff(int kind, int v, int prev) throws IOException {
+ if (v == 0) {
+ write0(kind);
+ return;
+ }
+
+ if (v == Integer.MIN_VALUE) { // special handling, because abs fails
+ writeVnumber(kind, 2); // written as -0
+ if (doMeasurements) {
+ sm.statDetails[kind].diffEncoded ++;
+ sm.statDetails[kind].valueLeDiff ++;
+ }
+ return;
+ }
+
+ // fsIndexes_i is for writing out modified FSs
+ if ((kind == heapRef_i) || (kind == fsIndexes_i)) {
+ // for heap refs, we write out the seq # instead
+ v = fsStartIndexes.getTgtSeqFromSrcAddr(v);
+ if (v == -1) { // this ref goes to some fs not in target, substitute null
+ if (kind == fsIndexes_i) {
+ // can't happen - delta ser never done with a tgtTs different from srcTs
+ throw new RuntimeException();
+ }
+ write0(kind);
+ return;
+ }
+ if (prev != 0) {
+ prev = fsStartIndexes.getTgtSeqFromSrcAddr(prev);
+ }
+ }
+
+ final int absV = Math.abs(v);
+ if (((v > 0) && (prev > 0)) ||
+ ((v < 0) && (prev < 0))) {
+ final int diff = v - prev; // guaranteed not to overflow
+ final int absDiff = Math.abs(diff);
+ writeVnumber(kind,
+ (absV <= absDiff) ?
+ ((long)absV << 2) + ((v < 0) ? 2L : 0L) :
+ ((long)absDiff << 2) + ((diff < 0) ? 3L : 1L));
+ if (doMeasurements) {
+ sm.statDetails[kind].diffEncoded ++;
+ sm.statDetails[kind].valueLeDiff += (absV <= absDiff) ? 1 : 0;
+ }
+ return;
+ }
+ // if get here, then the abs v value is always <= the abs diff value.
+ writeVnumber(kind, ((long)absV << 2) + ((v < 0) ? 2 : 0));
+ if (doMeasurements) {
+ sm.statDetails[kind].diffEncoded ++;
+ sm.statDetails[kind].valueLeDiff ++;
+ }
+ }
+
+ private void write0(int kind) throws IOException {
+ writeVnumber(kind, 0); // a speedup, not a new encoding
+ if (doMeasurements) {
+ sm.statDetails[kind].diffEncoded ++;
+ sm.statDetails[kind].valueLeDiff ++;
+ }
+ }
+ private void writeFromByteArray(SlotKind kind, int startPos, int length) throws IOException {
+ byte_dos.write(byteHeapObj.heap, startPos, length);
+ }
+
+ private void writeFromLongArray(SlotKind kind, int startPos, int length) throws IOException {
+ final long[] h = longHeapObj.heap;
+ final int endPos = startPos + length;
+ long prev = 0;
+ for (int i = startPos; i < endPos; i++) {
+ final long e = h[i];
+ if (kind == Slot_DoubleRef) {
+ writeDouble(e);
+ } else {
+ writeLong(e, prev);
+ prev = e;
+ }
+ }
+ }
+
+ private void writeFromShortArray(int startPos, int length) throws IOException {
+ final short[] h = shortHeapObj.heap;
+ final int endPos = startPos + length;
+ int prev = 0;
+ for (int i = startPos; i < endPos; i++) {
+ final short e = h[i];
+ writeDiff(short_i, e, prev);
+ prev = e;
+ }
+ }
+
+ /******************************************************************************
+ * Modified Values
+ * Output:
+ * For each FS that has 1 or more modified values,
+ * write the heap addr converted to a seq # of the FS
+ *
+ * For all modified values within the FS:
+ * if it is an aux array element, write the index in the aux array and the new value
+ * otherwise, write the slot offset and the new value
+ ******************************************************************************/
+ public class SerializeModifiedFSs {
+
+ final int[] modifiedMainHeapAddrs = cas.getModifiedFSHeapAddrs().toArray();
+ final int[] modifiedFSs = cas.getModifiedFSList().toArray();
+ final int[] modifiedByteHeapAddrs = cas.getModifiedByteHeapAddrs().toArray();
+ final int[] modifiedShortHeapAddrs = cas.getModifiedShortHeapAddrs().toArray();
+ final int[] modifiedLongHeapAddrs = cas.getModifiedLongHeapAddrs().toArray();
+
+ {sortModifications();} // a non-static initialization block
+
+ final int modMainHeapAddrsLength = eliminateDuplicatesInMods(modifiedMainHeapAddrs);
+ final int modFSsLength = eliminateDuplicatesInMods(modifiedFSs);
+ final int modByteHeapAddrsLength = eliminateDuplicatesInMods(modifiedByteHeapAddrs);
+ final int modShortHeapAddrsLength = eliminateDuplicatesInMods(modifiedShortHeapAddrs);
+ final int modLongHeapAddrsLength = eliminateDuplicatesInMods(modifiedLongHeapAddrs);
+
+ // ima - index into modified arrays
+ // ixx, iPrevxxx - index in heap being changed
+ // value comes via the main heap or aux heaps
+
+ int imaModMainHeap = 0;
+ int imaModByteRef = 0;
+ int imaModShortRef = 0;
+ int imaModLongRef = 0;
+
+ // previous value - for things diff encoded
+ int vPrevModInt = 0;
+ int vPrevModHeapRef = 0;
+ short vPrevModShort = 0;
+ long vPrevModLong = 0;
+
+ int iHeap;
+ TypeInfo typeInfo;
+
+ private void serializeModifiedFSs() throws IOException {
+ // write out number of modified Feature Structures
+ writeVnumber(control_dos, modFSsLength);
+ // iterate over all modified feature structures
+ /**
+ * Theorems about these data
+ * 1) Assumption: if an AuxHeap array is modified, its heap FS is in the list of modFSs
+ * 2) FSs with AuxHeap values have increasing ref values into the Aux heap as FS addr increases
+ * (because the ref is not updateable).
+ * 3) Assumption: String array element modifications are main heap slot changes
+ * and recorded as such
+ */
+
+ for (int i = 0; i < modFSsLength; i++) {
+ iHeap = modifiedFSs[i];
+ final int tCode = heap[iHeap];
+ typeInfo = ts.getTypeInfo(tCode);
+
+ // write out the address of the modified FS
+ // will convert to seq# internally
+ writeDiff(fsIndexes_i, iHeap, iPrevHeap);
+ // delay updating iPrevHeap until end of "for" loop
+
+ /**************************************************
+ * handle aux byte, short, long array modifications
+ **************************************************/
+ if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) {
+ writeAuxHeapMods();
+ } else {
+ writeMainHeapMods();
+ } // end of processing 1 modified FS
+ iPrevHeap = iHeap;
+ } // end of for loop over all modified FSs
+ } // end of method
+
+ // sort and remove duplicates
+ private void sortModifications() {
+ Arrays.sort(modifiedMainHeapAddrs);
+ Arrays.sort(modifiedFSs);
+ Arrays.sort(modifiedByteHeapAddrs);
+ Arrays.sort(modifiedShortHeapAddrs);
+ Arrays.sort(modifiedLongHeapAddrs);
+ }
+
+ private int eliminateDuplicatesInMods(final int[] sorted) {
+ int length = sorted.length;
+ if (length < 2) {
+ return length;
+ }
+
+ int prev = sorted[0];
+ int to = 1;
+ for(int from = 1; from < length; from++) {
+ int s = sorted[from];
+ if (s == prev) {
+ continue;
+ }
+ prev = s;
+ sorted[to] = s;
+ to++;
+ }
+ return to; // to is length
+ }
+
+ private int countModifiedSlotsInFs(int fsLength) {
+ return countModifiedSlots(iHeap, fsLength, modifiedMainHeapAddrs, imaModMainHeap, modMainHeapAddrsLength);
+ }
+
+ private int countModifiedSlotsInAuxHeap(int[] modifiedAddrs, int indexInModAddrs, int length) {
+ return countModifiedSlots(heap[iHeap + 2], heap[iHeap + 1], modifiedAddrs, indexInModAddrs, length);
+ }
+
+ private int countModifiedSlots(int firstAddr, int length, int[] modifiedAddrs, int indexInModAddrs, int modAddrsLength) {
+ if (0 == length) {
+ throw new RuntimeException(); // can't happen
+ }
+ final int nextAddr = firstAddr + length;
+ int nextModAddr = modifiedAddrs[indexInModAddrs];
+ if ((firstAddr > nextModAddr) ||
+ (nextModAddr >= nextAddr)) {
+ throw new RuntimeException(); // never happen - must have one slot at least modified in this fs
+ }
+ int i = 1;
+ for (;; i++) {
+ if ((indexInModAddrs + i) == modAddrsLength) {
+ break;
+ }
+ nextModAddr = modifiedAddrs[indexInModAddrs + i];
+ if (nextModAddr >= nextAddr) {
+ break;
+ }
+ }
+ return i;
+ }
+
+ private void writeMainHeapMods() throws IOException {
+ final int fsLength = incrToNextFs(heap, iHeap, typeInfo);
+ final int numberOfModsInFs = countModifiedSlotsInFs(fsLength);
+ writeVnumber(fsIndexes_dos, numberOfModsInFs);
+ int iPrevOffsetInFs = 0;
+
+ for (int i = 0; i < numberOfModsInFs; i++) {
+ final int nextMainHeapIndex = modifiedMainHeapAddrs[imaModMainHeap++];
+ final int offsetInFs = nextMainHeapIndex - iHeap;
+
+ writeVnumber(fsIndexes_dos, offsetInFs - iPrevOffsetInFs);
+ iPrevOffsetInFs = offsetInFs;
+
+ final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : offsetInFs);
+
+ switch (kind) {
+ case Slot_HeapRef:
+ vPrevModHeapRef = writeIntOrHeapRef(heapRef_i, nextMainHeapIndex, vPrevModHeapRef);
+ break;
+ case Slot_Int:
+ vPrevModInt = writeIntOrHeapRef(int_i, nextMainHeapIndex, vPrevModInt);
+ break;
+ case Slot_Short:
+ vPrevModShort = (short)writeIntOrHeapRef(int_i, nextMainHeapIndex, vPrevModShort);
+ break;
+ case Slot_LongRef:
+ vPrevModLong = writeLongFromHeapIndex(nextMainHeapIndex, vPrevModLong);
+ break;
+ case Slot_Byte: case Slot_Boolean:
+ byte_dos.write(heap[nextMainHeapIndex]);
+ break;
+ case Slot_Float:
+ writeFloat(heap[nextMainHeapIndex]);
+ break;
+ case Slot_StrRef:
+ writeString(stringHeapObj.getStringForCode(heap[nextMainHeapIndex]));
+ break;
+ case Slot_DoubleRef:
+ writeDouble(longHeapObj.getHeapValue(heap[nextMainHeapIndex]));
+ break;
+ default:
+ throw new RuntimeException();
+ }
+
+ } // end of looping for all modified slots in this FS
+ }
+
+ private void writeAuxHeapMods() throws IOException {
+ final int auxHeapIndex = heap[iHeap + 2];
+ int iPrevOffsetInAuxArray = 0;
+
+ final SlotKind kind = typeInfo.getSlotKind(2); // get kind of element
+ final boolean isAuxByte = ((kind == Slot_BooleanRef) || (kind == Slot_ByteRef));
+ final boolean isAuxShort = (kind == Slot_ShortRef);
+ final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
+
+ if (!(isAuxByte | isAuxShort | isAuxLong)) {
+ throw new RuntimeException(); // never happen
+ }
+
+ final int[] modXxxHeapAddrs = isAuxByte ? modifiedByteHeapAddrs :
+ isAuxShort ? modifiedShortHeapAddrs :
+ modifiedLongHeapAddrs;
+ final int modXxxHeapAddrsLength = isAuxByte ? modByteHeapAddrsLength :
+ isAuxShort ? modShortHeapAddrsLength :
+ modLongHeapAddrsLength;
+ int imaModXxxRef = isAuxByte ? imaModByteRef :
+ isAuxShort ? imaModShortRef :
+ imaModLongRef;
+
+ final int numberOfModsInAuxHeap = countModifiedSlotsInAuxHeap(modXxxHeapAddrs, imaModXxxRef, modXxxHeapAddrsLength);
+ writeVnumber(fsIndexes_dos, numberOfModsInAuxHeap);
+
+ for (int i = 0; i < numberOfModsInAuxHeap; i++) {
+ final int nextModAuxIndex = modXxxHeapAddrs[imaModXxxRef++];
+ final int offsetInAuxArray = nextModAuxIndex - auxHeapIndex;
+
+ writeVnumber(fsIndexes_dos, offsetInAuxArray - iPrevOffsetInAuxArray);
+ iPrevOffsetInAuxArray = offsetInAuxArray;
+
+ if (isAuxByte) {
+ writeUnsignedByte(byte_dos, byteHeapObj.getHeapValue(nextModAuxIndex));
+ } else if (isAuxShort) {
+ final short v = shortHeapObj.getHeapValue(nextModAuxIndex);
+ writeDiff(int_i, v, vPrevModShort);
+ vPrevModShort = v;
+ } else {
+ long v = longHeapObj.getHeapValue(nextModAuxIndex);
+ if (kind == Slot_LongRef) {
+ writeLong(v, vPrevModLong);
+ vPrevModLong = v;
+ } else {
+ writeDouble(v);
+ }
+ }
+
+ if (isAuxByte) {
+ imaModByteRef++;
+ } else if (isAuxShort) {
+ imaModShortRef++;
+ } else {
+ imaModLongRef++;
+ }
+
+ }
+ }
+ } // end of class definition for SerializeModifiedFSs
+
+ /*************************************************************************************
+ * D E S E R I A L I Z E
+ *************************************************************************************/
+
+ public void deserialize(InputStream istream) throws IOException {
+ deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream
+ : new DataInputStream(istream);
+
+ readHeader();
+
+ if (isReadingDelta) {
+ if (null == foundFSs) {
+ throw new UnsupportedOperationException("Deserializing Delta Cas, but original not serialized from");
+ }
+ } else {
+ cas.resetNoQuestions();
+ }
+
+ deserializeAfterVersion(deserIn, isReadingDelta);
+ }
+
+ public void deserializeAfterVersion(DataInputStream istream, boolean isDelta) throws IOException {
+
+ deserIn = istream;
+ this.isDelta = isReadingDelta = isDelta;
+ setupReadStreams();
+
+ /************************************************
+ * Read in the common string(s)
+ ************************************************/
+ int lenCmnStrs = readVnumber(strChars_dis);
+ readCommonString = new String[lenCmnStrs];
+ for (int i = 0; i < lenCmnStrs; i++) {
+ readCommonString[i] = DataIO.readUTFv(strChars_dis);
+ }
+ only1CommonString = lenCmnStrs == 1;
+ /***************************
+ * Prepare to walk main heap
+ ***************************/
+ int heapUsedInTarget = readVnumber(control_dis);
+ final Heap heapObj = cas.getHeap();
+
+ heapStart = isReadingDelta ? heapObj.getNextId() : 0;
+ stringTableOffset = isReadingDelta ? (stringHeapObj.getSize() - 1) : 0;
+
+ // This next is just an approximation - it may be too large or too small if type mapping is in effect.
+ // Subsequent code needs to check on each FS if there's enough space left. If not, it needs to grow.
+ // At end, the next avail position in the heap needs to be set to its actual value,
+ // in case the amount stored was smaller.
+ if (isReadingDelta) {
+ heapObj.grow(heapUsedInTarget);
+ } else {
+ heapObj.reinitSizeOnly(heapUsedInTarget);
+ }
+ heap = heapObj.heap; // because it may change if growing or reinit
+
+ Arrays.fill(iPrevHeapArray, 0);
+
+ if (heapStart == 0) {
+ heapStart = 1; // slot 0 not serialized, it's null / 0
+ }
+
+ // For Delta CAS,
+ // Reuse previously computed map of addr <--> seq for existing FSs below mark line
+ // map of seq(this CAS) <--> seq(incoming)
+ // that accounts for type code mismatch using typeMapper
+ // note: rest of maps computed incrementally as we deserialize
+ // Two possibilities: The CAS has a type, but the incoming is missing that type (services)
+ // The incoming has a type, but the CAS is missing it - (deser from file)
+ // Below the merge line: only the 1st is possible
+ // Above the merge line: only the 2nd is possible
+
+ if (isReadingDelta) {
+ // scan current source being added to / merged into
+ if (fsStartIndexes.getNumberSrcFss() == 1) {
+ throw new IllegalStateException("Reading Delta into CAS not serialized from");
+ }
+ }
+
+ fixupsNeeded = new IntVector(Math.max(16, heap.length / 10));
+
+ /**********************************************************
+ * Read in new FSs being deserialized and add them to heap
+ **********************************************************/
+ for (int iHeap = heapStart, targetHeapUsed = isReadingDelta ? 0 : 1; targetHeapUsed < heapUsedInTarget;) {
+ final int tgtTypeCode = readVnumber(typeCode_dis); // get type code
+ final int srcTypeCode = isTypeMapping ? typeMapper.mapTypeCodeTgt2Src(tgtTypeCode) : tgtTypeCode;
+ final boolean storeIt = (srcTypeCode != 0);
+ if (storeIt) {
+ heap[iHeap] = srcTypeCode;
+ }
+ fsStartIndexes.addSrcAddrForTgt(iHeap, storeIt);
+ // A receiving client from a service always
+ // has a superset of the service's types due to type merging so this
+ // won't happen for that use case. But
+ // a deserialize-from-file could hit this if the receiving type system
+ // deleted a type.
+
+ // The strategy for deserializing heap refs depends on finding
+ // the prev value for that type. This can be skipped for
+ // types being skipped because they don't exist in the source
+
+ // typeInfo is Target Type Info
+ typeInfo = isTypeMapping ? tgtTs.getTypeInfo(tgtTypeCode) :
+ ts.getTypeInfo(srcTypeCode);
+ final TypeInfo srcTypeInfo =
+ (!isTypeMapping) ? typeInfo :
+ storeIt ? ts.getTypeInfo(srcTypeCode) :
+ null;
+ iPrevHeap = iPrevHeapArray[srcTypeCode]; // will be ignored for non-existant type
+
+ if (typeInfo.isHeapStoredArray) {
+ readHeapStoredArray(iHeap, storeIt);
+ } else if (typeInfo.isArray) {
+ readNonHeapStoredArray(iHeap, storeIt);
+ } else {
+ // is normal type with slots
+ if (isTypeMapping && storeIt) {
+ final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(srcTypeCode);
+ for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
+ final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;
+ SlotKind kind = typeInfo.getSlotKind(i+1);
+ readByKind(iHeap, featOffsetInSrc, kind, storeIt);
+ }
+ } else {
+ for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+ SlotKind kind = typeInfo.getSlotKind(i);
+ readByKind(iHeap, i, kind, storeIt);
+ }
+ }
+ }
+ if (storeIt) {
+ iPrevHeapArray[srcTypeCode] = iHeap; // make this one the "prev" one for subsequent testing
+ }
+// todo need to incr src heap by amt filtered (in case some slots missing,
+// need to incr tgt (for checking end) by unfiltered amount
+// need to fixup final heap to account for skipped slots
+// need to have read skip slots not present in src
+ targetHeapUsed += incrToNextFs(heap, iHeap, typeInfo); // typeInfo is target type info
+ iHeap += storeIt ? incrToNextFs(heap, iHeap, srcTypeInfo) : 0;
+ }
+
+ final int end = fixupsNeeded.size();
+ for (int i = 0; i < end; i++) {
+ final int heapAddrToFix = fixupsNeeded.get(i);
+ heap[heapAddrToFix] = fsStartIndexes.getSrcAddrFromTgtSeq(heap[heapAddrToFix]);
+ }
+
+ readIndexedFeatureStructures();
+
+ if (isReadingDelta) {
+ (new ReadModifiedFSs()).readModifiedFSs();
+ }
+
+ closeDataInputs();
+// System.out.format("Deserialize took %,d ms%n", System.currentTimeMillis() - startTime1);
+ }
+
+ private void readNonHeapStoredArray(int iHeap, boolean storeIt) throws IOException {
+ final int length = readArrayLength(iHeap, storeIt);
+ if (length == 0) {
+ return;
+ }
+ SlotKind refKind = typeInfo.getSlotKind(2);
+ switch (refKind) {
+ case Slot_BooleanRef: case Slot_ByteRef:
+ final int byteRef = readIntoByteArray(length, storeIt);
+ if (storeIt) {
+ heap[iHeap + 2] = byteRef;
+ }
+ break;
+ case Slot_ShortRef:
+ final int shortRef = readIntoShortArray(length, storeIt);
+ if (storeIt) {
+ heap[iHeap + 2] = shortRef;
+ }
+ break;
+ case Slot_LongRef: case Slot_DoubleRef:
+ final int longDblRef = readIntoLongArray(refKind, length, storeIt);
+ if (storeIt) {
+ heap[iHeap + 2] = longDblRef;
+ }
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+ private int readArrayLength(int iHeap, boolean storeIt) throws IOException {
+ final int v = readVnumber(arrayLength_dis);
+ if (storeIt) {
+ heap[iHeap + 1] = v;
+ }
+ return v;
+ }
+
+ private void readHeapStoredArray(int iHeap, boolean storeIt) throws IOException {
+ final int length = readArrayLength(iHeap, storeIt);
+ // output values
+ // special case 0 and 1st value
+ if (length == 0) {
+ return;
+ }
+ SlotKind arrayElementKind = typeInfo.slotKinds[1];
+ final int endi = iHeap + length + 2;
+ switch (arrayElementKind) {
+ case Slot_HeapRef: case Slot_Int: case Slot_Short:
+ {
+ int prev = (iPrevHeap == 0) ? 0 :
+ (heap[iPrevHeap + 1] == 0) ? 0 : // prev array length = 0
+ heap[iPrevHeap + 2]; // prev array 0th element
+ for (int i = iHeap + 2; i < endi; i++) {
+ final int v = readDiff(arrayElementKind, prev);
+ if (storeIt) {
+ heap[i] = v;
+ if (arrayElementKind == Slot_HeapRef) {
+ fixupsNeeded.add(i);
+ }
+ }
+ prev = v;
+ }
+ }
+ break;
+ case Slot_Float:
+ for (int i = iHeap + 2; i < endi; i++) {
+ final int floatRef = readFloat();
+ if (storeIt) {
+ heap[i] = floatRef;
+ }
+ }
+ break;
+ case Slot_StrRef:
+ for (int i = iHeap + 2; i < endi; i++) {
+ final int strRef = readString(storeIt);
+ if (storeIt) {
+ heap[i] = strRef;
+ }
+ }
+ break;
+
+ default: throw new RuntimeException("internal error");
+ } // end of switch
+ }
+
+ /**
+ *
+ * @param iHeap
+ * @param offset can be -1 - in which case read, but don't store
+ * @throws IOException
+ */
+ private void readByKind(int iHeap, int offset, SlotKind kind, boolean storeIt) throws IOException {
+
+ if (offset == -1) {
+ storeIt = false;
+ }
+ switch (kind) {
+ case Slot_Int: case Slot_Short:
+ readDiffWithPrevTypeSlot(kind, iHeap, offset, storeIt);
+ break;
+ case Slot_Float:
+ final int floatAsInt = readFloat();
+ if (storeIt) {
+ heap[iHeap + offset] = floatAsInt;
+ }
+ break;
+ case Slot_Boolean: case Slot_Byte:
+ final byte vByte = byte_dis.readByte();
+ if (storeIt) {
+ heap[iHeap + offset] = vByte;
+ }
+ break;
+ case Slot_HeapRef:
+ readDiffWithPrevTypeSlot(kind, iHeap, offset, storeIt);
+ fixupsNeeded.add(iHeap + offset);
+ break;
+ case Slot_StrRef:
+ final int vStrRef = readString(storeIt);
+ if (storeIt) {
+ heap[iHeap + offset] = vStrRef;
+ }
+ break;
+ case Slot_LongRef: {
+ long v = readLong(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+ if (v == 0L) {
+ if (longZeroIndex == -1) {
+ longZeroIndex = longHeapObj.addLong(0L);
+ }
+ if (storeIt) {
+ heap[iHeap + offset] = longZeroIndex;
+ }
+ } else {
+ if (storeIt) {
+ heap[iHeap + offset] = longHeapObj.addLong(v);
+ }
+ }
+ break;
+ }
+ case Slot_DoubleRef: {
+ long v = readDouble();
+ if (v == 0L) {
+ if (longZeroIndex == -1) {
+ longZeroIndex = longHeapObj.addLong(0L);
+ }
+ if (storeIt) {
+ heap[iHeap + offset] = longZeroIndex;
+ }
+ } else if (v == DBL_1) {
+ if (double1Index == -1) {
+ double1Index = longHeapObj.addLong(DBL_1);
+ }
+ if (storeIt) {
+ heap[iHeap + offset] = double1Index;
+ }
+ } else {
+ if (storeIt) {
+ heap[iHeap + offset] = longHeapObj.addLong(v);
+ }
+ }
+ break;
+ }
+ default:
+ throw new RuntimeException("internal error");
+ } // end of switch
+ }
+
+ private void readIndexedFeatureStructures() throws IOException {
+ final int nbrViews = readVnumber(control_dis);
+ final int nbrSofas = readVnumber(control_dis);
+
+ IntVector fsIndexes = new IntVector(nbrViews + nbrSofas + 100);
+ fsIndexes.add(nbrViews);
+ fsIndexes.add(nbrSofas);
+ for (int i = 0; i < nbrSofas; i++) {
+ fsIndexes.add(readVnumber(control_dis));
+ }
+
+ for (int i = 0; i < nbrViews; i++) {
+ readFsxPart(fsIndexes); // added FSs
+ if (isDelta) {
+ readFsxPart(fsIndexes); // removed FSs
+ readFsxPart(fsIndexes); // reindexed FSs
+ }
+ }
+
+ if (isDelta) {
+ // getArray avoids copying.
+ // length is too long, but extra is never accessed
+ cas.reinitDeltaIndexedFSs(fsIndexes.getArray());
+ } else {
+ cas.reinitIndexedFSs(fsIndexes.getArray());
+ }
+ }
+
+ /**
+ * Each FS index is sorted, and output is by delta
+ */
+ private void readFsxPart(IntVector fsIndexes) throws IOException {
+ final int nbrEntries = readVnumber(control_dis);
+ int nbrEntriesAdded = 0;
+ final int indexOfNbrAdded = fsIndexes.size();
+ fsIndexes.add(0); // a place holder, will be updated at end
+ int prev = 0;
+
+ for (int i = 0; i < nbrEntries; i++) {
+ int v = readVnumber(fsIndexes_dis) + prev;
+ prev = v;
+ v = fsStartIndexes.getSrcAddrFromTgtSeq(v);
+ if (v > 0) { // if not, no src type for this type in tgtTs
+ nbrEntriesAdded++;
+ fsIndexes.add(v);
+ }
+ }
+ fsIndexes.set(indexOfNbrAdded, nbrEntriesAdded);
+ }
+
+
+ private DataInput getInputStream(SlotKind kind) {
+ return dataInputs[kind.ordinal()];
+ }
+
+ private int readVnumber(DataInputStream dis) throws IOException {
+ return DataIO.readVnumber(dis);
+ }
+
+ private long readVlong(DataInputStream dis) throws IOException {
+ return DataIO.readVlong(dis);
+ }
+
+ private int readIntoByteArray(int length, boolean storeIt) throws IOException {
+ if (storeIt) {
+ final int startPos = byteHeapObj.reserve(length);
+ byte_dis.readFully(byteHeapObj.heap, startPos, length);
+ return startPos;
+ } else {
+ byte_dis.skipBytes(length);
+ return 0;
+ }
+ }
+
+ private int readIntoShortArray(int length, boolean storeIt) throws IOException {
+ if (storeIt) {
+ final int startPos = shortHeapObj.reserve(length);
+ final short[] h = shortHeapObj.heap;
+ final int endPos = startPos + length;
+ short prev = 0;
+ for (int i = startPos; i < endPos; i++) {
+ h[i] = prev = (short)(readDiff(short_dis, prev));
+ }
+ return startPos;
+ } else {
+ short_dis.skipBytes(length * 2);
+ return 0;
+ }
+ }
+
+ private int readIntoLongArray(SlotKind kind, int length, boolean storeIt) throws IOException {
+ if (storeIt) {
+ final int startPos = longHeapObj.reserve(length);
+ final long[] h = longHeapObj.heap;
+ final int endPos = startPos + length;
+ long prev = 0;
+ for (int i = startPos; i < endPos; i++) {
+ h[i] = prev = readLong(kind, prev);
+ }
+ return startPos;
+ } else {
+ skipLong(length);
+ return 0;
+ }
+ }
+
+ private void readDiffWithPrevTypeSlot(
+ SlotKind kind,
+ int iHeap,
+ int offset,
+ boolean storeIt) throws IOException {
+ if (storeIt) {
+ int prev = (iPrevHeap == 0) ? 0 : heap[iPrevHeap + offset];
+ heap[iHeap + offset] = readDiff(kind, prev);
+ } else {
+ readDiff(kind, 0);
+ }
+ }
+
+ private int readDiff(SlotKind kind, int prev) throws IOException {
+ return readDiff(getInputStream(kind), prev);
+ }
+
+ private int readDiff(DataInput in, int prev) throws IOException {
+ final long encoded = readVlong(in);
+ final boolean isDeltaEncoded = (0 != (encoded & 1L));
+ final boolean isNegative = (0 != (encoded & 2L));
+ int v = (int)(encoded >>> 2);
+ if (isNegative) {
+ if (v == 0) {
+ return Integer.MIN_VALUE;
+ }
+ v = -v;
+ }
+ if (isDeltaEncoded) {
+ v = v + prev;
+ }
+ return v;
+
+ }
+
+ private long readLong(SlotKind kind, long prev) throws IOException {
+ if (kind == Slot_DoubleRef) {
+ return readDouble();
+ }
+
+ final int vh = readDiff(long_High_dis, (int) (prev >>> 32));
+ final int vl = readDiff(long_Low_dis, (int) prev);
+ final long v = (((long)vh) << 32) | (0xffffffffL & (long)vl);
+ return v;
+ }
+
+ private void skipLong(int length) throws IOException {
+ for (int i = 0; i < length; i++) {
+ long_High_dis.skipBytes(8);
+ long_Low_dis.skipBytes(8);
+ }
+ }
+
+ private int readFloat() throws IOException {
+ final int exponent = readVnumber(float_Exponent_dis);
+ if (exponent == 0) {
+ return 0;
+ }
+ int mants = readVnumber(float_Mantissa_Sign_dis);
+ final boolean isNegative = (mants & 1) == 1;
+ mants = mants >>> 1;
+ // the next parens needed to get around eclipse / java bug
+ mants = (Integer.reverse(mants) >>> 9);
+
+ return ((exponent - 1) << 23) |
+ mants |
+ ((isNegative) ? 0x80000000 : 0);
+ }
+
+ private int decodeIntSign(int v) {
+ if (1 == (v & 1)) {
+ return - (v >>> 1);
+ }
+ return v >>> 1;
+ }
+
+ private long readDouble() throws IOException {
+ int exponent = readVnumber(double_Exponent_dis);
+ if (exponent == 0) {
+ return 0L;
+ }
+ long mants = readVlong(double_Mantissa_Sign_dis);
+ return decodeDouble(mants, exponent);
+ }
+
+ private long decodeDouble(long mants, int exponent) {
+ exponent = decodeIntSign(exponent);
+ if (exponent > 0) {
+ exponent --;
+ }
+ exponent = exponent + 1023;
+ long r = ((long)((exponent) & 0x7ff)) << 52;
+ final boolean isNegative = (1 == (mants & 1));
+ mants = Long.reverse(mants >>> 1) >>> 12;
+ r = r | mants | (isNegative ? 0x8000000000000000L : 0);
+ return r;
+ }
+
+ private long readVlong(DataInput dis) throws IOException {
+ return DataIO.readVlong(dis);
+ }
+
+ private int readString(boolean storeIt) throws IOException {
+ int length = decodeIntSign(readVnumber(strLength_dis));
+ if (0 == length) {
+ return 0;
+ }
+ if (1 == length) {
+ if (storeIt) {
+ return stringHeapObj.addString("");
+ } else {
+ return 0;
+ }
+ }
+
+ if (length < 0) { // in this case, -length is the slot index
+ if (storeIt) {
+ return stringTableOffset - length;
+ } else {
+ return 0;
+ }
+ }
+ int offset = readVnumber(strOffset_dis);
+ int segmentIndex = (only1CommonString) ? 0 :
+ readVnumber(strSeg_dis);
+ if (storeIt) {
+ String s = readCommonString[segmentIndex].substring(offset, offset + length - 1);
+ return stringHeapObj.addString(s);
+ } else {
+ return 0;
+ }
+ }
+
+ /******************************************************************************
+ * Modified Values
+ *
+ * Modified heap values need fsStartIndexes conversion
+ ******************************************************************************/
+
+ class ReadModifiedFSs {
+
+ // previous value - for things diff encoded
+ int vPrevModInt = 0;
+ int prevModHeapRefTgtSeq = 0;
+ short vPrevModShort = 0;
+ long vPrevModLong = 0;
+ int iHeap;
+ TypeInfo typeInfo;
+ int[] tgtF2srcF;
+
+ // for handling aux heaps with type mapping which may skip some things in the target
+ // An amount that needs to be added to the offset from target to account for
+ // source types and features not in the target.
+ //
+ // Because this is only done for Delta CAS, it is guaranteed that the
+ // target cannot contain types or features that are not in the source
+ // (due to type merging)
+// int[] srcHeapIndexOffset;
+//
+// Iterator<AuxSkip>[] srcSkipIt; // iterator over skip points
+// AuxSkip[] srcNextSkipped; // next skipped
+// int[] srcNextSkippedIndex;
+
+ private void readModifiedFSs() throws IOException {
+ final int modFSsLength = readVnumber(control_dis);
+ int prevSeq = 0;
+
+// if (isTypeMapping) {
+// for (int i = 0; i < AuxHeapsCount; i++) {
+// srcHeapIndexOffset[i] = 0;
+// srcSkipIt[i] = fsStartIndexes.skips.get(i).iterator();
+// srcNextSkipped[i] = (srcSkipIt[i].hasNext()) ? srcSkipIt[i].next() : null;
+// srcNextSkippedIndex[i] = (srcNextSkipped[i] == null) ? Integer.MAX_VALUE : srcNextSkipped[i].skipIndex;
+// }
+// }
+
+ for (int i = 0; i < modFSsLength; i++) {
+ final int seqNbrModified = readDiff(fsIndexes_dis, prevSeq);
+// iHeap = readVnumber(fsIndexes_dis) + iPrevHeap;
+ prevSeq = seqNbrModified;
+// iPrevHeap = iHeap;
+
+ iHeap = fsStartIndexes.getSrcAddrFromTgtSeq(seqNbrModified);
+ if (iHeap < 1) {
+ // never happen because delta CAS ts system case, the
+ // target is always a subset of the source
+ // due to type system merging
+ throw new RuntimeException("never happen");
+ }
+ final int tCode = heap[iHeap];
+ typeInfo = ts.getTypeInfo(tCode);
+ if (isTypeMapping) {
+ tgtF2srcF = typeMapper.getTgtFeatOffsets2Src(tCode);
+ }
+
+ final int numberOfModsInThisFs = readVnumber(fsIndexes_dis);
+
+ if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) {
+ /**************************************************
+ * handle aux byte, short, long array modifications
+ * Note: boolean stored in byte array
+ * Note: strings are heap-store-arrays
+ **************************************************/
+ readModifiedAuxHeap(numberOfModsInThisFs);
+ } else {
+ readModifiedMainHeap(numberOfModsInThisFs);
+ }
+ }
+ }
+
+ // update the byte/short/long aux heap entries
+ // for arrays
+ /**
+ * update the byte/short/long aux heap entries
+ * Only called for arrays
+ * No aux heap offset adjustments needed since we get
+ * the accuract source start point from the source heap
+ */
+ private void readModifiedAuxHeap(int numberOfMods) throws IOException {
+ int prevOffset = 0;
+ final int auxHeapIndex = heap[iHeap + 2];
+ final SlotKind kind = typeInfo.getSlotKind(2); // get kind of element
+ final boolean isAuxByte = ((kind == Slot_BooleanRef) || (kind == Slot_ByteRef));
+ final boolean isAuxShort = (kind == Slot_ShortRef);
+ final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
+ if (!(isAuxByte | isAuxShort | isAuxLong)) {
+ throw new RuntimeException(); // never happen
+ }
+
+ for (int i2 = 0; i2 < numberOfMods; i2++) {
+ final int offset = readVnumber(fsIndexes_dis) + prevOffset;
+ prevOffset = offset;
+
+ if (isAuxByte) {
+ byteHeapObj.setHeapValue(byte_dis.readByte(), auxHeapIndex + offset);
+ } else if (isAuxShort) {
+ final short v = (short)readDiff(int_dis, vPrevModShort);
+ vPrevModShort = v;
+ shortHeapObj.setHeapValue(v, auxHeapIndex + offset);
+ } else {
+ final long v = readLong(kind, vPrevModLong);
+ if (kind == Slot_LongRef) {
+ vPrevModLong = v;
+ }
+ longHeapObj.setHeapValue(v, auxHeapIndex + offset);
+ }
+ }
+ }
+
+ private void readModifiedMainHeap(int numberOfMods) throws IOException {
+ int iPrevTgtOffsetInFs = 0;
+ for (int i = 0; i < numberOfMods; i++) {
+ final int tgtOffsetInFs = readVnumber(fsIndexes_dis) + iPrevTgtOffsetInFs;
+ iPrevTgtOffsetInFs = tgtOffsetInFs;
+ final int srcOffsetInFs = isTypeMapping ? tgtF2srcF[tgtOffsetInFs] : tgtOffsetInFs;
+ if (srcOffsetInFs < 0) {
+ // never happen because if type mapping, and delta cas being deserialized,
+ // all of the target features would have been merged into the source ones.
+ throw new RuntimeException();
+ }
+ final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : srcOffsetInFs);
+
+ switch (kind) {
+ case Slot_HeapRef: {
+ final int tgtSeq = readDiff(heapRef_dis, prevModHeapRefTgtSeq);
+ prevModHeapRefTgtSeq = tgtSeq;
+ final int v = fsStartIndexes.getSrcAddrFromTgtSeq(tgtSeq);
+ heap[iHeap + srcOffsetInFs] = v;
+ }
+ break;
+ case Slot_Int: {
+ final int v = readDiff(int_dis, vPrevModInt);
+ vPrevModInt = v;
+ heap[iHeap + srcOffsetInFs] = v;
+ }
+ break;
+ case Slot_Short: {
+ final int v = readDiff(int_dis, vPrevModShort);
+ vPrevModShort = (short)v;
+ heap[iHeap + srcOffsetInFs] = v;
+ }
+ break;
+ case Slot_LongRef: case Slot_DoubleRef: {
+ final long v = readLong(kind, vPrevModLong);
+ if (kind == Slot_LongRef) {
+ vPrevModLong = v;
+ }
+ heap[iHeap + srcOffsetInFs] = longHeapObj.addLong(v);
+ }
+ break;
+ case Slot_Byte: case Slot_Boolean:
+ heap[iHeap + tgtOffsetInFs] = byte_dis.readByte();
+ break;
+ case Slot_Float:
+ heap[iHeap + tgtOffsetInFs] = readFloat();
+ break;
+ case Slot_StrRef:
+ heap[iHeap + tgtOffsetInFs] = readString(true);
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ }
+ }
+ }
+
+
+ /********************************************************************
+ * methods common to serialization / deserialization etc.
+ ********************************************************************/
+
+
+ private static int incrToNextFs(int[] heap, int iHeap, TypeInfo typeInfo) {
+ if (typeInfo.isHeapStoredArray) {
+ return 2 + heap[iHeap + 1];
+ } else {
+ return 1 + typeInfo.slotKinds.length;
+ }
+ }
+
+ /**
+ * This routine uses the same "scanning" to do two completely different things:
+ * The first thing is to generate an ordered set (by heap addr)
+ * of all FSs that are to be serialized:
+ * because they are in some index, or
+ * are pointed to by something that is in some index (recursively)
+ *
+ * The second thing is to serialize out the index information.
+ * This step has to wait until the first time call has completed and
+ * the fsStartIndexes instance has a chance to be built.
+ *
+ * The cas is passed in so that the Compare can use this for two different CASes
+ *
+ * @throws IOException
+ */
+ private void processIndexedFeatureStructures(CASImpl cas, boolean isWrite) throws IOException {
+ if (!isWrite) {
+ foundFSs = new IntArrayRBT();
+ }
+ final int[] fsIndexes = isSerializingDelta ? cas.getDeltaIndexedFSs(mark) : cas.getIndexedFSs();
+ final int nbrViews = fsIndexes[0];
+ final int nbrSofas = fsIndexes[1];
+
+ if (isWrite) {
+ if (doMeasurements) {
+ sm.statDetails[fsIndexes_i].original = fsIndexes.length * 4 + 1;
+ }
+ writeVnumber(control_i, nbrViews);
+ writeVnumber(control_i, nbrSofas);
+ if (doMeasurements) {
+ sm.statDetails[fsIndexes_i].incr(1); // an approximation - probably correct
+ sm.statDetails[fsIndexes_i].incr(1);
+ }
+ }
+
+ int fi = 2;
+ final int end1 = nbrSofas + 2;
+ for (; fi < end1; fi++) {
+// writeVnumber(control_i, fsIndexes[fi]); // version 0
+ final int addrSofaFs = fsIndexes[fi];
+ if (isWrite) {
+ final int v = fsStartIndexes.getTgtSeqFromSrcAddr(addrSofaFs);
+ writeVnumber(control_i, v); // version 1
+
+ if (doMeasurements) {
+ sm.statDetails[fsIndexes_i].incr(DataIO.lengthVnumber(v));
+ }
+ } else {
+ enqueueFS(foundFSs, addrSofaFs);
+ }
+ }
+
+ for (int vi = 0; vi < nbrViews; vi++) {
+ fi = processFsxPart(fsIndexes, fi, foundFSs, isWrite); // added FSs
+ if (isWrite && isSerializingDelta) {
+ fi = processFsxPart(fsIndexes, fi, null, false); // removed FSs
+ fi = processFsxPart(fsIndexes, fi, null, false); // reindexed FSs
+ }
+ }
+ return;
+ }
+
+ private int processFsxPart(
+ int[] fsIndexes,
+ int fsNdxStart,
+ IntArrayRBT foundFSs,
+ boolean isWrite) throws IOException {
+ int ix = fsNdxStart;
+ final int nbrEntries = fsIndexes[ix++];
+ final int end = ix + nbrEntries;
+ // version 0
+// writeVnumber(fsIndexes_dos, nbrEntries); // number of entries
+ //version 1: the list is filtered by the tgt type, and may be smaller;
+ // it is written at the end, into the control_dos stream
+// if (doMeasurements) {
+// sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(nbrEntries));
+// }
+
+ final int[] ia = new int[nbrEntries];
+ // Arrays are sorted, because order doesn't matter to the logic, but
+ // sorted arrays can be compressed via diff encoding better
+ System.arraycopy(fsIndexes, ix, ia, 0, nbrEntries);
+ Arrays.sort(ia);
+
+ int prev = 0;
+ int entriesWritten = 0; // can be less than nbrEntries if type mapping excludes some types in target
+
+ for (int i = 0; i < ia.length; i++) {
+ final int fsAddr = ia[i];
+ // skip if not in target
+// if (!isTypeMapping || (0 != typeMapper.mapTypeCodeSrc2Tgt(heap[fsAddr]))) {
+ if (isWrite) {
+ final int tgtV = fsStartIndexes.getTgtSeqFromSrcAddr(fsAddr);
+ if (tgtV == -1) {
+ continue; // skip - the target doesn't have this Fs
+ }
+ final int delta = tgtV - prev;
+ entriesWritten++;
+ writeVnumber(fsIndexes_dos, delta);
+ if (doMeasurements) {
+ sm.statDetails[fsIndexes_i].incr(DataIO.lengthVnumber(delta));
+ }
+ prev = tgtV;
+ } else {
+ enqueueFS(foundFSs, fsAddr);
+ }
+ }
+ if (isWrite) {
+ writeVnumber(control_dos, entriesWritten); // version 1
+ if (doMeasurements) {
+ sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(entriesWritten));
+ }
+ }
+ return end;
+ }
+
+ private void enqueueFS(IntArrayRBT foundFSs, int fsAddr) {
+ if (null == foundFSs) {
+ return;
+ }
+ if (0 != fsAddr) {
+ if (!foundFSs.containsKey(fsAddr)) {
+ if (!isSerializingDelta || mark.isNew(fsAddr)) {
+ foundFSs.insertKey(fsAddr);
+ enqueueFeatures(foundFSs, fsAddr);
+ }
+ }
+ }
+ }
+
+ /**
+ * Enqueue all FSs reachable from features of the given FS.
+ */
+ private void enqueueFeatures(IntArrayRBT foundFSs, int addr) {
+ final int tCode = heap[addr];
+ final TypeInfo typeInfo = ts.getTypeInfo(tCode);
+ final SlotKind[] kinds = typeInfo.slotKinds;
+
+ if (typeInfo.isHeapStoredArray && (Slot_HeapRef == kinds[1])) {
+ // fs array, add elements
+ final int length = heap[addr + 1];
+ for (int i = 0; i < length; i++) {
+ enqueueFS(foundFSs, heap[addr + 2 + i]);
+ }
+ return;
+ }
+
+ // not an FS Array
+ if (typeInfo.isArray) {
+ return;
+ }
+
+ if (isTypeMapping) {
+ final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode);
+ for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
+ final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1; // add one for origin 1
+ if (featOffsetInSrc == 0) {
+ throw new RuntimeException(); // never happen because for serialization, target is never a superset of features of src
+ }
+ if (kinds[featOffsetInSrc - 1] == Slot_HeapRef) {
+ enqueueFS(foundFSs, heap[addr + featOffsetInSrc]);
+ }
+ }
+ } else {
+ for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+ if (kinds[i - 1] == Slot_HeapRef) {
+ enqueueFS(foundFSs, heap[addr + i]);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Serializing:
+ * Called at beginning, scans whole CAS
+ * Deserializing (Delta Cas only):
+ * Called at beginning if doing delta CAS, scans old CAS up to mark
+ * @param fsStartIndexes
+ * @param srcHeap
+ * @param srcHeapStart
+ * @param srcHeapEnd
+ * @param histo
+ * @param optimizeStrings
+ * @param stringHeapObj
+ * @return amount of heap used in target, side effect: set up fsStartIndexes (for both src and tgt)
+ */
+ private int initFsStartIndexes () {
+
+ final boolean isTypeMapping = isTypeMappingCmn;
+ final CasTypeSystemMapper typeMapper = typeMapperCmn;
+ final IntListIterator foundFSsIterator = foundFSs.iterator();
+
+ int tgtHeapUsed = 0;
+ int nextTgtHeap = 1;
+ int markStringHeap = (isDelta) ? mark.getNextStringHeapAddr() : 0;
+
+ while (foundFSsIterator.hasNext()) {
+ final int iSrcHeap = foundFSsIterator.next();
+ final int iTgtHeap = nextTgtHeap;
+ final int tCode = heap[iSrcHeap];
+ final int tgtTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
+ final boolean isIncludedType = (tgtTypeCode != 0);
+
+ // record info for type
+ fsStartIndexes.addItemAddr(iSrcHeap, iTgtHeap, isIncludedType); // maps src heap to tgt seq
+
+ // for features in type -
+ // strings: accumulate those strings that are in the target, if optimizeStrings != null
+ // strings either in array, or in individual values
+ // byte (array), short (array), long/double (instance or array): record if entries in aux array are skipped
+ // (not in the target). Note the recording will be in a non-ordered manner (due to possible updates by
+ // previous delta deserialization)
+ final TypeInfo srcTypeInfo = ts.getTypeInfo(tCode);
+ final TypeInfo tgtTypeInfo = (isTypeMapping && isIncludedType) ?
+ typeMapper.tsTgt.getTypeInfo(tgtTypeCode) :
+ srcTypeInfo;
+
+
+ // add strings for included types (only when serializing)
+ if (isIncludedType && (os != null)) {
+
+ // next test only true if tgtTypeInfo.slotKinds[1] == Slot_StrRef
+ // because this is the built-in type string array which is final
+ if (srcTypeInfo.isHeapStoredArray && (srcTypeInfo.slotKinds[1] == Slot_StrRef)) {
+ for (int i = 0; i < heap[iSrcHeap + 1]; i++) {
+ // this bit of strange logic depends on the fact that all new and updated strings
+ // are "added" at the end of the string heap in the current impl
+ final int strHeapIndex = heap[iSrcHeap + 2 + i];
+ if (strHeapIndex >= markStringHeap) {
+ os.add(stringHeapObj.getStringForCode(strHeapIndex));
+ }
+ }
+ } else {
+ final int[] strOffsets = srcTypeInfo.strRefOffsets;
+ final boolean[] fSrcInTgt = isTypeMapping ? typeMapper.getFSrcInTgt(tCode) : null;
+ for (int i = 0; i < strOffsets.length; i++ ) {
+ int srcOffset = strOffsets[i]; // offset to slot having str ref
+ // add only those strings in slots that are in target
+ if (!isTypeMapping || fSrcInTgt[srcOffset]) {
+ final int strHeapIndex = heap[iSrcHeap + strOffsets[i]];
+ // this bit of strange logic depends on the fact that all new and updated strings
+ // are "added" at the end of the string heap in the current impl
+ if (strHeapIndex >= markStringHeap) {
+ os.add(stringHeapObj.getStringForCode(strHeapIndex));
+ }
+ }
+ }
+ }
+ }
+
+ // Advance to next Feature Structure, in both source and target heap frame of reference
+ if (isIncludedType) {
+ final int deltaTgtHeap = incrToNextFs(heap, iSrcHeap, tgtTypeInfo);
+ nextTgtHeap += deltaTgtHeap;
+ if (iSrcHeap >= heapStart) { // don't use up tgt heap if delta, and below the mark
+ tgtHeapUsed += deltaTgtHeap;
+ }
+ }
+ }
+
+ return tgtHeapUsed; // side effect: set up fsStartIndexes
+ }
+
+
+ /**
+ * Compare 2 CASes, with perhaps different type systems.
+ * If the type systems are different, construct a type mapper and use that
+ * to selectively ignore types or features not in other type system
+ *
+ * Compare only feature structures reachable via indexes or refs
+ * The order must match
+ *
+ * @param c1 CAS to compare
+ * @param c2 CAS to compare
+ * @return true if equal (for types / features in both)
+ */
+ public static boolean compareCASes(CASImpl c1, CASImpl c2) {
+ BinaryCasSerDes6 containingInstance = new BinaryCasSerDes6(c1);
+ return (containingInstance.new CasCompare(c1, c2)).compareCASes();
+ }
+
+ public class CasCompare {
+ /**
+ * Compare 2 CASes for equal
+ * The layout of refs to aux heaps does not have to match
+ */
+ final private CASImpl c1;
+ final private CASImpl c2;
+ final private TypeSystemImpl ts1;
+ final private TypeSystemImpl ts2;
+ final private boolean isTypeMapping;
+ final private CasTypeSystemMapper typeMapper;
+ final private Heap c1HO;
+ final private Heap c2HO;
+ final private int[] c1heap;
+ final private int[] c2heap;
+
+ private TypeInfo typeInfo;
+ private int c1heapIndex;
+ private int c2heapIndex;
+
+ private IntRedBlackTree addr2seq1 = new IntRedBlackTree();
+ private IntRedBlackTree addr2seq2 = new IntRedBlackTree();
+
+ public CasCompare(CASImpl c1, CASImpl c2) {
+ this.c1 = c1;
+ this.c2 = c2;
+ ts1 = c1.getTypeSystemImpl();
+ ts2 = c2.getTypeSystemImpl();
+ isTypeMapping = (ts1 != ts2);
+ typeMapper = isTypeMapping ? new CasTypeSystemMapper(ts1, ts2) : null;
+ c1HO = c1.getHeap();
+ c2HO = c2.getHeap();
+ c1heap = c1HO.heap;
+ c2heap = c2HO.heap;
+ }
+
+ public boolean compareCASes() {
+ IntArrayRBT c1FoundFSs;
+ IntArrayRBT c2FoundFSs;
+ try {
+ processIndexedFeatureStructures(c1, false);
+ c1FoundFSs = foundFSs;
+ foundFSs = new IntArrayRBT();
+ processIndexedFeatureStructures(c2, false);
+ c2FoundFSs = foundFSs;
+ } catch (IOException e) {
+ throw new RuntimeException(e); // never happen
+ }
+ IntPointerIterator c1Iterator = c1FoundFSs.pointerIterator();
+ IntPointerIterator c2Iterator = c2FoundFSs.pointerIterator();
+
+ int i = 0;
+ while (c1Iterator.isValid()) {
+ addr2seq1.put(c1Iterator.get(), i++);
+ c1Iterator.inc();
+ }
+ i = 0;
+ while (c2Iterator.isValid()) {
+ addr2seq2.put(c2Iterator.get(), i++);
+ c2Iterator.inc();
+ }
+
+ c1Iterator.moveToFirst();
+ c2Iterator.moveToFirst();
+
+// initFsStartIndexesCompare();
+
+ // Iterating over both CASes
+ // If c1 is past end, verify all c2 instances up to its end are not in c1
+ // If c2 is past end, verify all c1 instances up to its end are not in c1
+ // If c1's instance type exists in c2, compare & advance both iterators
+ // If c1's instance type doesn't exist in c2, advance c1 and continue
+ // If c2's instance type doesn't exist in c1, advance c2 and continue
+
+
+// final int endHeapSeqSrc = fsStartIndexes.getNbrOfItems();
+// c1heapIndex = 1;
+// c2heapIndex = 1;
+// boolean pastEnd1 = false;
+// boolean pastEnd2 = false;
+
+ while (c1Iterator.isValid() && c2Iterator.isValid()) {
+ c1heapIndex = c1Iterator.get();
+ c2heapIndex = c2Iterator.get();
[... 617 lines stripped ...]