You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2013/02/14 23:55:06 UTC

svn commit: r1446378 [2/3] - in /uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src: main/java/org/apache/uima/cas/impl/ main/java/org/apache/uima/util/impl/ test/java/org/apache/uima/cas/impl/

Added: uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java?rev=1446378&view=auto
==============================================================================
--- uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java (added)
+++ uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java Thu Feb 14 22:55:06 2013
@@ -0,0 +1,2991 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.cas.impl;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.uima.cas.AbstractCas;
+import org.apache.uima.cas.CASRuntimeException;
+import org.apache.uima.cas.impl.SlotKinds.SlotKind;
+import org.apache.uima.cas.impl.TypeSystemImpl.TypeInfo;
+import org.apache.uima.internal.util.IntListIterator;
+import org.apache.uima.internal.util.IntPointerIterator;
+import org.apache.uima.internal.util.IntVector;
+import org.apache.uima.internal.util.rb_trees.IntArrayRBT;
+import org.apache.uima.internal.util.rb_trees.IntRedBlackTree;
+import org.apache.uima.jcas.JCas;
+import org.apache.uima.util.impl.DataIO;
+import org.apache.uima.util.impl.OptimizeStrings;
+import org.apache.uima.util.impl.SerializationMeasures;
+
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.*;
+
+/**
+ * User callable serialization and deserialization of the CAS in a compressed Binary Format
+ * 
+ * This serializes/deserializes the state of the CAS.  It has the capability to map type systems,
+ * so the sending and receiving type systems do not have to be the same.
+ *   - types and features are matched by name, and features must have the same range (slot kind)
+ *   - types and/or features in one type system not in the other are skipped over
+ * 
+ * Header specifies to reader the format, and the compression level.
+ * 
+ * How to Serialize:  
+ * 
+ * 1) create an instance of this class
+ * 2) call serialize() to serialize the CAS
+ * 3) If doing serialization to a target from which you expect to receive back a delta CAS,
+ *    keep this object and reuse it for deserializing the delta CAS.
+ *    
+ * Otherwise, you cannot reuse this object.
+ * 
+ * TypeSystemImpl objects are lazily augmented by customized TypeInfo instances for each type encountered in 
+ * serializing or deserializing.  These are preserved for future calls, so their setup / initialization is only
+ * needed the first time.
+ * 
+ * TypeSystemImpl objects are also lazily augmented by typeMappers for individual different target typesystems;
+ * these too are preserved and reused on future calls.
+ * 
+ * Compressed Binary CASes are designed to be "self-describing" - 
+ * The format of the compressed binary CAS, including version info, 
+ * is inserted at the beginning so that a proper deserialization method can be automatically chosen.  
+ *     
+ * Compressed Binary format supports type system mapping.  Types in the source which are not in the target
+ * (or vice versa) are omitted.  Types with "extra" features have their extra features omitted 
+ * (or on deserialization, they are set to their default value - null, or 0, etc.).  
+ * 
+ * How to Deserialize:
+ * 
+ * 1) get an appropriate CAS to deserialize into.  For delta CAS, it does not have to be empty, but it must
+ *    be the originating CAS from which the delta was produced.
+ * 2) call CASImpl: cas.reinit(inputStream)  This is the existing method
+ *    for binary deserialization, and it now handles this compressed version, too.
+ *    Delta cas is also supported.  This form requires both source and target type systems to be exactly the same.
+ *    
+ * An alternative interface for Deserialization with type system mapping:
+ * 1) get an appropriate CAS to deserialize into (as above).
+ * 2) create (or reuse, if delta deserializing) an instance of this class -> xxx 
+ * 3) call xxx.deserialize(receivingCas, inputStream, targetTypeSystemImpl)
+ *    where the targetTypeSystem is the type system of the cas being deserialized
+ * 
+ * Compression/Decompression
+ * Works in two stages:
+ *   application of Zip/Unzip to particular sub-collections of CAS data, 
+ *     grouped according to similar data distribution
+ *   collection of like kinds of data (to make the zipping more effective)
+ *   There can be up to ~20 of these collections, such as
+ *      control info, float-exponents, string chars
+ * Deserialization:
+ *   Read all bytes, 
+ *   create separate ByteArrayInputStreams for each segment
+ *   create appropriate unzip data input streams for these
+ *   
+ * API design
+ *   Slow but expensive data: 
+ *     extra type system info - lazily created and added to shared TypeSystemImpl object
+ *       set up per type actually referenced
+ *     mapper for type system - lazily created and added to shared TypeSystemImpl object
+ *       in identity-map cache (size limit = 10 per source type system?) - key is target typesystemimpl.
+ *   Defaulting:
+ *     flags:  doMeasurements, compressLevel, CompressStrategy
+ *   Per serialize call: cas, output, [target ts], [mark for delta]
+ *   Per deserialize call: cas, input, [target ts]
+ *   
+ *   CASImpl has instance method with defaulting args for serialization.
+ *   CASImpl has reinit which works with compressed binary serialization objects
+ *     if no type mapping
+ *     If type mapping, (new BinaryCasSerDes6().deserialize(in-steam, [targetTypeSystem])
+ *     
+ * Use Cases, filtering and delta
+ *   **************************************************************************
+ *   * (de)serialize * filter? * delta? * Use case
+ *   **************************************************************************
+ *   * serialize     *   N     *   N    * Saving a Cas, 
+ *   *               *         *        * sending Cas to svc with identical ts
+ *   **************************************************************************
+ *   * serialize     *   Y     *   N    * sending Cas to svc with 
+ *   *               *         *        * different ts (a subset)
+ *   **************************************************************************
+ *   * serialize     *   N     *   Y    * returning Cas to client
+ *   *               *         *        * (?? saving just a delta to disk??)
+ *   **************************************************************************
+ *   * serialize     *   Y     *   Y    * NOT SUPPORTED (not needed)  
+ *   **************************************************************************
+ *   * deserialize   *   N     *   N    * reading/(receiving) CAS, identical TS
+ *   **************************************************************************
+ *   * deserialize   *   Y     *   N    * reading/(receiving) CAS, different TS
+ *   **************************************************************************
+ *   * deserialize   *   N     *   Y    * receiving CAS, identical TS, 
+ *   **************************************************************************
+ *   * deserialize   *   Y     *   Y    * receiving CAS, different TS (tgt a feature subset)
+ *   **************************************************************************
+ */
+public class BinaryCasSerDes6 {
+
+  /**
+   * Version of the serializer/deserializer, used to allow deserialization of 
+   * older versions
+   * 
+   * Version 0 - initial SVN checkin
+   * Version 1 - changes to support CasTypeSystemMapper 
+   */
+  private static final int VERSION = 1;  
+  
+  private static final long DBL_1 = Double.doubleToLongBits(1D);
+
+  /**
+   * Compression alternatives
+   */
+  
+  public enum CompressLevel {
+    None(   Deflater.NO_COMPRESSION),
+    Fast(   Deflater.BEST_SPEED),
+    Default(Deflater.DEFAULT_COMPRESSION),
+    Best(   Deflater.BEST_COMPRESSION),
+    ;
+    final public int lvl;
+    CompressLevel(int lvl) {
+      this.lvl = lvl;
+    }
+  }
+  
+  public enum CompressStrat {
+    Default(      Deflater.DEFAULT_STRATEGY),
+    Filtered(     Deflater.FILTERED),
+    HuffmanOnly(  Deflater.HUFFMAN_ONLY),
+    ;
+    final public int strat;
+    CompressStrat(int strat) {
+      this.strat = strat;
+    }
+  }
+  /**
+   * Info reused for multiple serializations of same cas to multiple targets, or
+   * for deserializing with a delta cas.
+   * Reachable FSs and Sequence maps
+   * Reuseable in serialization, and for deserialization of delta cas
+   */
+  public static class ReuseInfo {
+    final private IntArrayRBT foundFSs; // ordered set of FSs found in indexes or linked from other found FSs
+    final private CasSeqAddrMaps fsStartIndexes;
+    final private TypeSystemImpl tgtTs;
+    
+    private ReuseInfo(
+        IntArrayRBT foundFSs, 
+        CasSeqAddrMaps fsStartIndexes,
+        TypeSystemImpl tgtTs) {
+      this.foundFSs = foundFSs;
+      this.fsStartIndexes = fsStartIndexes;
+      this.tgtTs = tgtTs;
+    }
+  }
+  
+  public ReuseInfo getReuseInfo() {
+    return new ReuseInfo(foundFSs, fsStartIndexes, tgtTs);
+  }
+    
+  // speedups - ints for SlotKind ordinals
+  final private static int arrayLength_i = Slot_ArrayLength.ordinal();
+  final private static int heapRef_i = Slot_HeapRef.ordinal();
+  final private static int int_i = Slot_Int.ordinal();
+  final private static int byte_i = Slot_Byte.ordinal();
+  final private static int short_i = Slot_Short.ordinal();
+  final private static int typeCode_i = Slot_TypeCode.ordinal();
+  final private static int strOffset_i = Slot_StrOffset.ordinal();
+  final private static int strLength_i = Slot_StrLength.ordinal();
+  final private static int long_High_i = Slot_Long_High.ordinal();
+  final private static int long_Low_i = Slot_Long_Low.ordinal();
+  final private static int float_Mantissa_Sign_i = Slot_Float_Mantissa_Sign.ordinal();
+  final private static int float_Exponent_i = Slot_Float_Exponent.ordinal();
+  final private static int double_Mantissa_Sign_i = Slot_Double_Mantissa_Sign.ordinal();
+  final private static int double_Exponent_i = Slot_Double_Exponent.ordinal();
+  final private static int fsIndexes_i = Slot_FsIndexes.ordinal();
+  final private static int strChars_i = Slot_StrChars.ordinal();
+  final private static int control_i = Slot_Control.ordinal();
+  final private static int strSeg_i = Slot_StrSeg.ordinal();
+
+  /**
+   * Things set up for one instance of this class
+   */
+  final private TypeSystemImpl ts;
+  final private CompressLevel compressLevel;
+  final private CompressStrat compressStrategy;  
+  
+  /**
+   * Things that are used by common routines among serialization and deserialization
+   */
+  private boolean isTypeMappingCmn;
+  private CasTypeSystemMapper typeMapperCmn;
+
+  /*****************************************************
+   *  Things for both serialization and Deserialization
+   *****************************************************/
+  final private CASImpl cas;  // cas being serialized
+  private int[] heap;           // main heap, can't be final because grow replaces it
+  final private StringHeap stringHeapObj;
+  final private LongHeap longHeapObj;
+  final private ShortHeap shortHeapObj;
+  final private ByteHeap byteHeapObj;
+
+  private int heapStart;
+  private int heapEnd;                 // set when deserializing   
+  private int totalMappedHeapSize = 0; // heapEnd - heapStart, but with FS that don't exist in the target type system deleted    
+
+  final private boolean isSerializingDelta;        // if true, there is a marker indicating the start spot(s)
+        private boolean isDelta;
+        private boolean isReadingDelta;
+  final private MarkerImpl mark;  // the mark to serialize from
+
+  final private CasSeqAddrMaps fsStartIndexes;
+  final private boolean doMeasurements;  // if true, doing measurements
+
+  private OptimizeStrings os;
+  private boolean only1CommonString;  // true if only one common string
+
+  final private TypeSystemImpl tgtTs;
+
+  private TypeInfo typeInfo; // type info for the current type being serialized/deserialized
+  final private CasTypeSystemMapper typeMapper;
+  final private boolean isTypeMapping;
+
+  final private int[] iPrevHeapArray; // index of previous instance of this typecode in heap, by typecode
+  private int iPrevHeap;        // 0 or heap addr of previous instance of current type
+
+  private IntArrayRBT foundFSs; // ordered set of FSs found in indexes or linked from other found FSs
+//  final private int[] typeCodeHisto = new int[ts.getTypeArraySize()]; 
+
+  /********************************* 
+   * Things for just serialization  
+   *********************************/
+  private DataOutputStream serializedOut;  // where to write out the serialized result
+  
+  final private SerializationMeasures sm;  // null or serialization measurements
+  final private ByteArrayOutputStream[] baosZipSources = new ByteArrayOutputStream[NBR_SLOT_KIND_ZIP_STREAMS];  // lazily created, indexed by SlotKind.i
+  final private DataOutputStream[] dosZipSources = new DataOutputStream[NBR_SLOT_KIND_ZIP_STREAMS];      // lazily created, indexed by SlotKind.i
+
+  final private int[] estimatedZipSize = new int[NBR_SLOT_KIND_ZIP_STREAMS]; // one entry for each output stream kind
+
+  // speedups
+  
+  // any use of these means caller handles measurement
+  // some of these are never used, because the current impl
+  //   is using the _i form to get measurements done
+  private DataOutputStream arrayLength_dos;
+  private DataOutputStream heapRef_dos;
+  private DataOutputStream int_dos;
+  private DataOutputStream byte_dos;
+  private DataOutputStream short_dos;
+  private DataOutputStream typeCode_dos;
+  private DataOutputStream strOffset_dos;
+  private DataOutputStream strLength_dos;
+  private DataOutputStream long_High_dos;
+  private DataOutputStream long_Low_dos;
+  private DataOutputStream float_Mantissa_Sign_dos;
+  private DataOutputStream float_Exponent_dos;
+  private DataOutputStream double_Mantissa_Sign_dos;
+  private DataOutputStream double_Exponent_dos;
+  private DataOutputStream fsIndexes_dos;
+  private DataOutputStream strChars_dos;
+  private DataOutputStream control_dos;
+  private DataOutputStream strSeg_dos;
+
+  /********************************** 
+   * Things for just deserialization  
+   **********************************/
+
+  private DataInputStream deserIn;
+  private int version;
+
+  final private DataInputStream[] dataInputs = new DataInputStream[NBR_SLOT_KIND_ZIP_STREAMS];
+  final private Inflater[] inflaters = new Inflater[NBR_SLOT_KIND_ZIP_STREAMS];
+
+  private IntVector fixupsNeeded;  // for deserialization, the "fixups" for relative heap refs needed  
+  private int stringTableOffset;
+  
+  /**
+   * These indexes remember sharable common values in aux heaps
+   * Values must be in aux heap, but not part of arrays there
+   *   so that rules out boolean, byte, and shorts
+   */
+  private int longZeroIndex = -1; // also used for double 0 index
+  private int double1Index = -1;
+
+  private String[] readCommonString;
+
+  // speedups
+  
+  private DataInputStream arrayLength_dis;
+  private DataInputStream heapRef_dis;
+  private DataInputStream int_dis;
+  private DataInputStream byte_dis;
+  private DataInputStream short_dis;
+  private DataInputStream typeCode_dis;
+  private DataInputStream strOffset_dis;
+  private DataInputStream strLength_dis;
+  private DataInputStream long_High_dis;
+  private DataInputStream long_Low_dis;
+  private DataInputStream float_Mantissa_Sign_dis;
+  private DataInputStream float_Exponent_dis;
+  private DataInputStream double_Mantissa_Sign_dis;
+  private DataInputStream double_Exponent_dis;
+  private DataInputStream fsIndexes_dis;
+  private DataInputStream strChars_dis;
+  private DataInputStream control_dis;
+  private DataInputStream strSeg_dis;
+
+
+  /**
+   * 
+   * @param ts Type System (the source type system)
+   * @param doMeasurements true if measurements should be collected
+   * @param compressLevel 
+   * @param compressStrategy
+   */
+  public BinaryCasSerDes6(
+      AbstractCas aCas,
+      MarkerImpl mark,
+      TypeSystemImpl tgtTs,
+      ReuseInfo rfs,
+      boolean doMeasurements, 
+      CompressLevel compressLevel, 
+      CompressStrat compressStrategy) {
+    cas = ((CASImpl) ((aCas instanceof JCas) ? ((JCas)aCas).getCas(): aCas)).getBaseCAS();
+    
+    this.ts = cas.getTypeSystemImpl();
+    this.mark = mark;
+    if (null != mark && !mark.isValid() ) {
+      throw new CASRuntimeException(
+                CASRuntimeException.INVALID_MARKER, new String[] { "Invalid Marker." });
+    }
+
+    this.doMeasurements = doMeasurements;
+    this.sm = doMeasurements ? new SerializationMeasures() : null;
+    
+    isDelta = isSerializingDelta = (mark != null);
+    doMeasurements = (sm != null);
+    typeMapperCmn = typeMapper = ts.getTypeSystemMapper(tgtTs);
+    isTypeMappingCmn = isTypeMapping = (null != typeMapper);
+    
+    heap = cas.getHeap().heap;
+    heapEnd = cas.getHeap().getCellsUsed();
+    heapStart = isSerializingDelta ? mark.getNextFSId() : 0;
+    
+    stringHeapObj = cas.getStringHeap();
+    longHeapObj   = cas.getLongHeap();
+    shortHeapObj  = cas.getShortHeap();
+    byteHeapObj   = cas.getByteHeap();
+       
+    iPrevHeapArray = new int[ts.getTypeArraySize()];
+
+    this.compressLevel = compressLevel;
+    this.compressStrategy = compressStrategy;
+    if (null != rfs) {
+      foundFSs = rfs.foundFSs;
+      fsStartIndexes = rfs.fsStartIndexes;
+      this.tgtTs = rfs.tgtTs;
+    } else {
+      fsStartIndexes = new CasSeqAddrMaps();
+      this.tgtTs = tgtTs;
+    }
+  }
+  
+  public BinaryCasSerDes6(AbstractCas cas) {
+    this(cas, null, null, null, false, CompressLevel.Default, CompressStrat.Default);
+  }
+  
+  public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark) {
+    this(cas, mark, null, null, false, CompressLevel.Default, CompressStrat.Default);
+  }
+  
+  public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark, TypeSystemImpl tgtTs) {
+    this(cas, mark, tgtTs, null, false, CompressLevel.Default, CompressStrat.Default);
+  }
+
+  public BinaryCasSerDes6(AbstractCas cas, ReuseInfo rfs) {
+    this(cas, null, null, rfs, false, CompressLevel.Default, CompressStrat.Default);
+  }
+
+  /*********************************************************************************************
+   * S e r i a l i z e r   Class for sharing variables among routines
+   * Class instantiated once per serialization
+   * Multiple serializations in parallel supported, with multiple instances of this
+   *********************************************************************************************/
+
+
+  /*************************************************************************************
+   *   S E R I A L I Z E
+   * @return null or serialization measurements (depending on setting of doMeasurements)
+   * @throws IOException
+   *************************************************************************************/
+  public SerializationMeasures serialize(Object out) throws IOException {
+    if (isSerializingDelta && (tgtTs != null)) {
+      throw new UnsupportedOperationException("Can't do Delta Serialization with different target TS");
+    }
+    setupOutputStreams(out);
+    
+    if (doMeasurements) {
+      System.out.println(printCasInfo(cas));
+      sm.origAuxBytes = cas.getByteHeap().getSize();
+      sm.origAuxShorts = cas.getShortHeap().getSize() * 2;
+      sm.origAuxLongs = cas.getLongHeap().getSize() * 8;
+      sm.totalTime = System.currentTimeMillis();
+    }
+
+    writeHeader();
+    os = new OptimizeStrings(doMeasurements);
+ 
+    /******************************************************************
+     * Find all FSs to be serialized via the indexes
+     *   including those FSs referenced  
+     ******************************************************************/
+    
+    if (foundFSs == null) {
+      processIndexedFeatureStructures(cas, false /* compute ref'd FSs, no write */);
+    }
+    
+    /***************************
+     * Prepare to walk main heap
+     * We prescan the main heap and
+     *   1) identify any types that should be skipped
+     *      building a source and target fsStartIndexes table
+     *   2) add all strings to the string table, 
+     *      for strings above the mark
+     ***************************/
+   
+      // scan thru all fs and save their offsets in the heap
+      // to allow conversion from addr to sequential fs numbers
+      // Also, compute sequential maps for non-equal type systems
+      // As a side effect, also add all strings that are included
+      // in the target type system to the set to be optimized.
+    totalMappedHeapSize = initFsStartIndexes();
+    if (heapStart == 0) {
+      totalMappedHeapSize++;  // include the null at the start
+      heapStart = 1;  // slot 0 not serialized, it's null / 0
+    }
+
+    /**************************
+     * Strings
+     **************************/
+    
+    os.optimize();
+    writeStringInfo();
+    
+    /***************************
+     * Prepare to walk main heap
+     ***************************/
+    writeVnumber(control_dos, totalMappedHeapSize);  
+    if (doMeasurements) {
+      sm.statDetails[Slot_MainHeap.ordinal()].original = (1 + heapEnd - heapStart) * 4;      
+    }
+    
+    Arrays.fill(iPrevHeapArray, 0);
+    
+    /***************************
+     * walk main heap
+     ***************************/
+
+    final IntListIterator foundFSsIterator = foundFSs.iterator();
+    int iHeap;
+    while (foundFSsIterator.hasNext()) {
+      iHeap = foundFSsIterator.next();
+//    for (int iHeap = heapStart; iHeap < heapEnd; iHeap += incrToNextFs(heap, iHeap, typeInfo)) {
+      final int tCode = heap[iHeap];  // get type code
+      final int mappedTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
+      if (mappedTypeCode == 0) { // means no corresponding type in target system
+        continue;
+      }
+
+      typeInfo = ts.getTypeInfo(tCode);
+      iPrevHeap = iPrevHeapArray[tCode];
+      
+      writeVnumber(typeCode_dos, mappedTypeCode);
+
+      if (typeInfo.isHeapStoredArray) {
+        serializeHeapStoredArray(iHeap);
+      } else if (typeInfo.isArray) {
+        serializeNonHeapStoredArray(iHeap);
+      } else {
+        if (isTypeMapping) {
+          // Serialize out in the order the features are in the target
+          final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode);
+          for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
+            final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;  // add one for origin 1
+            if (featOffsetInSrc == 0) {
+              throw new RuntimeException(); // never happen because for serialization, target is never a superset of features of src
+            }
+            serializeByKind(iHeap, featOffsetInSrc);
+          }
+        } else {
+          for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+            serializeByKind(iHeap, i);
+          }
+        }
+      }
+    
+      iPrevHeapArray[tCode] = iHeap;  // make this one the "prev" one for subsequent testing
+      if (doMeasurements) {
+        sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(tCode));
+        sm.mainHeapFSs ++;
+      }
+    }  // end of heap walk
+    
+    processIndexedFeatureStructures(cas, true /* pass 2 */);
+
+    if (isSerializingDelta) {
+      (new SerializeModifiedFSs()).serializeModifiedFSs();
+    }
+
+    collectAndZip();
+
+    if (doMeasurements) {
+      sm.totalTime = System.currentTimeMillis() - sm.totalTime;
+    }
+    return sm;
+  }
+          
+  private void serializeHeapStoredArray(int iHeap) throws IOException {
+    final int length = serializeArrayLength(iHeap);
+    // output values
+    // special case 0 and 1st value
+    if (length == 0) {
+      return;
+    }
+    SlotKind arrayElementKind = typeInfo.slotKinds[1];
+    final int endi = iHeap + length + 2;
+    switch (arrayElementKind) {
+    case Slot_HeapRef: case Slot_Int: case Slot_Short:
+      {
+        int prev = (iPrevHeap == 0) ? 0 : 
+                   (heap[iPrevHeap + 1] == 0) ? 0 : // prev length is 0
+                    heap[iPrevHeap + 2];  // use prev array 1st element
+        for (int i = iHeap + 2; i < endi; i++) {
+          prev = writeIntOrHeapRef(arrayElementKind.ordinal(), i, prev);
+        }
+      }
+      break;
+    case Slot_Float: 
+      for (int i = iHeap + 2; i < endi; i++) {
+        writeFloat(heap[i]);
+      }
+      break;
+    case Slot_StrRef:
+      for (int i = iHeap + 2; i < endi; i++) {
+        writeString(stringHeapObj.getStringForCode(heap[i]));
+      }
+      break;
+      
+    default: throw new RuntimeException("internal error");
+    } // end of switch    
+  }
+  
+  private int writeIntOrHeapRef(int kind, int index, int prev) throws IOException {
+    final int v = heap[index];
+    writeDiff(kind, v, prev);
+    return v;
+  }
+  
+  private long writeLongFromHeapIndex(int index, long prev) throws IOException {
+    final long v = longHeapObj.getHeapValue(heap[index]);      
+    writeLong(v, prev); 
+    return v;
+  }
+  
+  private void serializeNonHeapStoredArray(int iHeap) throws IOException {
+    final int length = serializeArrayLength(iHeap);
+    if (length == 0) {
+      return;
+    }
+    SlotKind refKind = typeInfo.getSlotKind(2);
+    switch (refKind) {
+    case Slot_BooleanRef: case Slot_ByteRef:
+      writeFromByteArray(refKind, heap[iHeap + 2], length);
+      if (doMeasurements) {
+        sm.statDetails[byte_i].incr(1);
+        sm.origAuxByteArrayRefs += 4;
+      }
+      break; 
+    case Slot_ShortRef:
+      writeFromShortArray(heap[iHeap + 2], length);
+      if (doMeasurements) {
+        sm.origAuxShortArrayRefs += 4;
+      }
+      break; 
+    case Slot_LongRef: case Slot_DoubleRef:
+      writeFromLongArray(refKind, heap[iHeap + 2], length);
+      if (doMeasurements) {
+        sm.origAuxLongArrayRefs += 4;
+      }
+      break; 
+    default:
+      throw new RuntimeException();
+    }
+  }
+  
+  private void serializeByKind(int iHeap, int offset) throws IOException {
+    SlotKind kind = typeInfo.getSlotKind(offset);      
+    switch (kind) {
+    //Slot_Int, Slot_Float, Slot_Boolean, Slot_Byte, Slot_Short
+    case Slot_Int: case Slot_Short: case Slot_HeapRef:
+      serializeDiffWithPrevTypeSlot(kind, iHeap, offset);
+      break;
+    case Slot_Float:
+      writeFloat(heap[iHeap + offset]);
+      break;
+    case Slot_Boolean: case Slot_Byte:
+      byte_dos.write(heap[iHeap + offset]);
+      break;
+    case Slot_StrRef: 
+      writeString(stringHeapObj.getStringForCode(heap[iHeap + offset]));
+      break;
+    case Slot_LongRef: 
+      writeLongFromHeapIndex(iHeap + offset, 
+                (iPrevHeap == 0) ? 
+                  0L : 
+                  longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+      break;
+    case Slot_DoubleRef: 
+      writeDouble(longHeapObj.getHeapValue(heap[iHeap + offset]));
+      break;
+    default: 
+      throw new RuntimeException("internal error");
+    } // end of switch
+  }
+  
+  private int serializeArrayLength(int iHeap) throws IOException {
+    final int length = heap[iHeap + 1];
+    writeVnumber(arrayLength_i, length);
+    return length;
+  }
+  
+  private void serializeDiffWithPrevTypeSlot(SlotKind kind, int iHeap, int offset) throws IOException {
+    int prev = (iPrevHeap == 0) ? 0 : heap[iPrevHeap + offset];
+    writeDiff(kind.ordinal(), heap[iHeap + offset], prev);
+  }
+  
+  /**
+   * Method:
+   *   write with deflation into a single byte array stream
+   *     skip if not worth deflating
+   *     skip the Slot_Control stream
+   *     record in the Slot_Control stream, for each deflated stream:
+   *       the Slot index
+   *       the number of compressed bytes
+   *       the number of uncompressed bytes
+   *   add to header:  
+   *     nbr of compressed entries
+   *     the Slot_Control stream size
+   *     the Slot_Control stream
+   *     all the zipped streams
+   *
+   * @throws IOException 
+   */
+  private void collectAndZip() throws IOException {
+    ByteArrayOutputStream baosZipped = new ByteArrayOutputStream(4096);
+    Deflater deflater = new Deflater(compressLevel.lvl, true);
+    deflater.setStrategy(compressStrategy.strat);
+    int nbrEntries = 0;
+    
+    List<Integer> idxAndLen = new ArrayList<Integer>();
+
+    for (int i = 0; i < baosZipSources.length; i++) {
+      ByteArrayOutputStream baos = baosZipSources[i];
+      if (baos != null) {
+        nbrEntries ++;
+        dosZipSources[i].close();
+        long startTime = System.currentTimeMillis();
+        int zipBufSize = Math.max(1024, baos.size() / 100);
+        deflater.reset();
+        DeflaterOutputStream cds = new DeflaterOutputStream(baosZipped, deflater, zipBufSize);       
+        baos.writeTo(cds);
+        cds.close();
+        idxAndLen.add(i);
+        if (doMeasurements) {
+          idxAndLen.add((int)(sm.statDetails[i].afterZip = deflater.getBytesWritten()));            
+          idxAndLen.add((int)(sm.statDetails[i].beforeZip = deflater.getBytesRead()));
+          sm.statDetails[i].zipTime = System.currentTimeMillis() - startTime;
+        } else {
+          idxAndLen.add((int)deflater.getBytesWritten());            
+          idxAndLen.add((int)deflater.getBytesRead());
+        }
+      } 
+    }
+    serializedOut.writeInt(nbrEntries);                     // write number of entries
+    for (int i = 0; i < idxAndLen.size();) {
+      serializedOut.write(idxAndLen.get(i++));
+      serializedOut.writeInt(idxAndLen.get(i++));
+      serializedOut.writeInt(idxAndLen.get(i++));
+    }
+    baosZipped.writeTo(serializedOut);                      // write Compressed info
+  }  
+ 
+  private void writeLong(long v, long prev) throws IOException {
+    writeDiff(long_High_i, (int)(v >>> 32), (int)(prev >>> 32));
+    writeDiff(long_Low_i,  (int)v, (int)prev);    
+  }
+
+  /**
+   * String encoding
+   *   Length = 0 - used for null, no offset written
+   *   Length = 1 - used for "", no offset written 
+   *   Length > 0 (subtract 1): used for actual string length
+   *   
+   *   Length < 0 - use (-length) as slot index  (minimum is 1, slot 0 is NULL)
+   *   
+   *   For length > 0, write also the offset.
+   *   
+   */
+  private void writeString(final String s) throws IOException {
+    if (null == s) {
+      writeVnumber(strLength_dos, 0);
+      if (doMeasurements) {
+        sm.statDetails[strLength_i].incr(1);
+      }
+      return;
+    } 
+    
+    int indexOrSeq = os.getIndexOrSeqIndex(s);
+    if (indexOrSeq < 0) {
+      final int v = encodeIntSign(indexOrSeq);
+      writeVnumber(strLength_dos, v);
+      if (doMeasurements) {
+        sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(v));
+      }
+      return;
+    }
+    
+    if (s.length() == 0) {
+      writeVnumber(strLength_dos, encodeIntSign(1));
+      if (doMeasurements) {
+        sm.statDetails[strLength_i].incr(1);
+      }
+      return;
+    }
+    
+    if (s.length() == Integer.MAX_VALUE) {
+      throw new RuntimeException("Cannot serialize string of Integer.MAX_VALUE length - too large.");
+    }
+    
+    final int offset = os.getOffset(indexOrSeq);
+    final int length = encodeIntSign(s.length() + 1);  // all lengths sign encoded because of above
+    writeVnumber(strOffset_dos, offset);
+    writeVnumber(strLength_dos, length);
+    if (doMeasurements) {
+      sm.statDetails[strOffset_i].incr(DataIO.lengthVnumber(offset));
+      sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(length));
+    }
+    if (!only1CommonString) {
+      final int csi = os.getCommonStringIndex(indexOrSeq);
+      writeVnumber(strSeg_dos, csi);
+      if (doMeasurements) {
+        sm.statDetails[strSeg_i].incr(DataIO.lengthVnumber(csi));
+      }
+    }
+  }
+
+  /**
+   * Need to support NAN sets, 
+   * 0x7fc.... for NAN
+   * 0xff8.... for NAN, negative infinity
+   * 0x7f8     for NAN, positive infinity
+   * 
+   * Because 0 occurs frequently, we reserve 
+   * exp of 0 for the value 0
+   *  
+   */
+  
+  private void writeFloat(int raw) throws IOException {
+    if (raw == 0) {
+      writeUnsignedByte(float_Exponent_dos, 0);
+      if (doMeasurements) {
+        sm.statDetails[float_Exponent_i].incr(1);
+      }
+      return;
+    }
+   
+    final int exponent = ((raw >>> 23) & 0xff) + 1;   // because we reserve 0, see above
+    final int revMants = Integer.reverse((raw & 0x007fffff) << 9);  
+    final int mants = (revMants << 1) + ((raw < 0) ? 1 : 0);
+    writeVnumber(float_Exponent_dos, exponent); 
+    writeVnumber(float_Mantissa_Sign_dos, mants);
+    if (doMeasurements) {
+      sm.statDetails[float_Exponent_i].incr(DataIO.lengthVnumber(exponent));
+      sm.statDetails[float_Mantissa_Sign_i].incr(DataIO.lengthVnumber(mants));
+    }
+  }
+
+  private void writeVnumber(int kind, int v) throws IOException {
+    DataIO.writeVnumber(dosZipSources[kind], v);
+    if (doMeasurements) {
+      sm.statDetails[kind].incr(DataIO.lengthVnumber(v));
+    }
+  }
+  
+  private void writeVnumber(int kind, long v) throws IOException {
+    DataIO.writeVnumber(dosZipSources[kind], v);
+    if (doMeasurements) {
+      sm.statDetails[kind].incr(DataIO.lengthVnumber(v));
+    }
+  }
+  
+  // this version doesn't do measurements, caller needs to do it
+  private void writeVnumber(DataOutputStream s, int v) throws IOException {
+    DataIO.writeVnumber(s, v);
+  }
+  
+  // this version doesn't do measurements, caller needs to do it
+  private void writeVnumber(DataOutputStream s, long v) throws IOException {
+    DataIO.writeVnumber(s, v);
+  }
+
+  // this version doesn't do measurements, caller needs to do it    
+  private void writeUnsignedByte(DataOutputStream s, int v) throws IOException {
+    s.write(v);
+  }
+
+  private void writeDouble(long raw) throws IOException {
+    if (raw == 0L) {
+      writeVnumber(double_Exponent_dos, 0);
+      if (doMeasurements) {
+        sm.statDetails[double_Exponent_i].incr(1);
+      }
+      return;
+    }
+    int exponent = (int)((raw >>> 52) & 0x7ff);
+    exponent = exponent - 1023; // rebase so 1.0 = 0
+    if (exponent >= 0) {
+      exponent ++; // skip "0", used above for 0 value
+    }
+    exponent = encodeIntSign(exponent);  
+    final long revMants = Long.reverse((raw & 0x000fffffffffffffL) << 12);  
+    final long mants = (revMants << 1) + ((raw < 0) ? 1 : 0);
+    writeVnumber(double_Exponent_dos, exponent);
+    writeVnumber(double_Mantissa_Sign_dos, mants);
+    if (doMeasurements) {
+      sm.statDetails[double_Exponent_i].incr(DataIO.lengthVnumber(exponent));
+      sm.statDetails[double_Mantissa_Sign_i].incr(DataIO.lengthVnumber(mants));
+    }
+  }
+  
+  private int encodeIntSign(int v) {
+    if (v < 0) {
+      return ((-v) << 1) | 1;
+    }
+    return (v << 1);
+  }
+
+  /**
+   * Encoding:
+   *    bit 6 = sign:   1 = negative
+   *    bit 7 = delta:  1 = delta
+   * @param kind
+   * @param i  runs from iHeap + 3 to end of array
+   * @throws IOException 
+   */
+  private void writeDiff(int kind, int v, int prev) throws IOException {
+    if (v == 0) {
+      write0(kind);
+      return;
+    }
+    
+    if (v == Integer.MIN_VALUE) { // special handling, because abs fails
+      writeVnumber(kind, 2);      // written as -0
+      if (doMeasurements) {
+        sm.statDetails[kind].diffEncoded ++;
+        sm.statDetails[kind].valueLeDiff ++;
+      }
+      return;
+    }
+  
+    // fsIndexes_i is for writing out modified FSs
+    if ((kind == heapRef_i) || (kind == fsIndexes_i)) {
+      // for heap refs, we write out the seq # instead
+      v = fsStartIndexes.getTgtSeqFromSrcAddr(v);
+      if (v == -1) { // this ref goes to some fs not in target, substitute null
+        if (kind == fsIndexes_i) {
+          // can't happen - delta ser never done with a tgtTs different from srcTs
+          throw new RuntimeException();
+        }
+        write0(kind); 
+        return;
+      }
+      if (prev != 0) {
+        prev = fsStartIndexes.getTgtSeqFromSrcAddr(prev);
+      }
+    }
+
+    final int absV = Math.abs(v);
+    if (((v > 0) && (prev > 0)) ||
+        ((v < 0) && (prev < 0))) {
+      final int diff = v - prev;  // guaranteed not to overflow
+      final int absDiff = Math.abs(diff);
+      writeVnumber(kind, 
+          (absV <= absDiff) ? 
+              ((long)absV << 2)    + ((v < 0) ? 2L : 0L) :
+              ((long)absDiff << 2) + ((diff < 0) ? 3L : 1L));
+      if (doMeasurements) {
+        sm.statDetails[kind].diffEncoded ++;
+        sm.statDetails[kind].valueLeDiff += (absV <= absDiff) ? 1 : 0;
+      }
+      return;
+    }
+    // if get here, then the abs v value is always <= the abs diff value.
+    writeVnumber(kind, ((long)absV << 2) + ((v < 0) ? 2 : 0));
+    if (doMeasurements) {
+      sm.statDetails[kind].diffEncoded ++;
+      sm.statDetails[kind].valueLeDiff ++;
+    }
+  }
+
+  private void write0(int kind) throws IOException {
+    writeVnumber(kind, 0);  // a speedup, not a new encoding
+    if (doMeasurements) {
+      sm.statDetails[kind].diffEncoded ++;
+      sm.statDetails[kind].valueLeDiff ++;
+    }    
+  }
+  private void writeFromByteArray(SlotKind kind, int startPos, int length) throws IOException {
+    byte_dos.write(byteHeapObj.heap, startPos, length);
+  }
+
+  private void writeFromLongArray(SlotKind kind, int startPos, int length) throws IOException {
+    final long[] h = longHeapObj.heap;
+    final int endPos = startPos + length;
+    long prev = 0;
+    for (int i = startPos; i < endPos; i++) {
+      final long e = h[i];
+      if (kind == Slot_DoubleRef) {
+        writeDouble(e);
+      } else {
+        writeLong(e, prev);
+        prev = e;
+      }
+    }
+  }
+  
+  private void writeFromShortArray(int startPos, int length) throws IOException {
+    final short[] h = shortHeapObj.heap;
+    final int endPos = startPos + length;
+    int prev = 0;
+    for (int i = startPos; i < endPos; i++) {
+      final short e = h[i];
+      writeDiff(short_i, e, prev);
+      prev = e;
+    }
+  }
+
+  /******************************************************************************
+   * Modified Values
+   * Output:
+   *   For each FS that has 1 or more modified values,
+   *     write the heap addr converted to a seq # of the FS
+   *     
+   *     For all modified values within the FS:
+   *       if it is an aux array element, write the index in the aux array and the new value
+   *       otherwise, write the slot offset and the new value
+   ******************************************************************************/
+  public class SerializeModifiedFSs {
+
+    final int[] modifiedMainHeapAddrs = cas.getModifiedFSHeapAddrs().toArray();
+    final int[] modifiedFSs = cas.getModifiedFSList().toArray();
+    final int[] modifiedByteHeapAddrs = cas.getModifiedByteHeapAddrs().toArray();
+    final int[] modifiedShortHeapAddrs = cas.getModifiedShortHeapAddrs().toArray();
+    final int[] modifiedLongHeapAddrs = cas.getModifiedLongHeapAddrs().toArray();
+
+    {sortModifications();}  // a non-static initialization block
+    
+    final int modMainHeapAddrsLength = eliminateDuplicatesInMods(modifiedMainHeapAddrs);
+    final int modFSsLength = eliminateDuplicatesInMods(modifiedFSs);
+    final int modByteHeapAddrsLength = eliminateDuplicatesInMods(modifiedByteHeapAddrs);
+    final int modShortHeapAddrsLength = eliminateDuplicatesInMods(modifiedShortHeapAddrs);
+    final int modLongHeapAddrsLength = eliminateDuplicatesInMods(modifiedLongHeapAddrs);
+
+    // ima           - index into modified arrays
+    // ixx, iPrevxxx - index in heap being changed
+    //                 value comes via the main heap or aux heaps
+      
+      int imaModMainHeap = 0;
+      int imaModByteRef = 0;
+      int imaModShortRef = 0;
+      int imaModLongRef = 0;
+ 
+      // previous value - for things diff encoded
+    int vPrevModInt = 0;
+    int vPrevModHeapRef = 0;
+    short vPrevModShort = 0;
+    long vPrevModLong = 0;
+    
+    int iHeap;
+    TypeInfo typeInfo;
+    
+    private void serializeModifiedFSs() throws IOException {
+      // write out number of modified Feature Structures
+      writeVnumber(control_dos, modFSsLength);
+      // iterate over all modified feature structures
+      /**
+       * Theorems about these data
+       *   1) Assumption: if an AuxHeap array is modified, its heap FS is in the list of modFSs
+       *   2) FSs with AuxHeap values have increasing ref values into the Aux heap as FS addr increases
+       *      (because the ref is not updateable).
+       *   3) Assumption: String array element modifications are main heap slot changes
+       *      and recorded as such
+       */
+      
+      for (int i = 0; i < modFSsLength; i++) {
+        iHeap = modifiedFSs[i];     
+        final int tCode = heap[iHeap];
+        typeInfo = ts.getTypeInfo(tCode);
+        
+        // write out the address of the modified FS
+        // will convert to seq# internally
+        writeDiff(fsIndexes_i, iHeap, iPrevHeap);
+        // delay updating iPrevHeap until end of "for" loop
+        
+        /**************************************************
+         * handle aux byte, short, long array modifications
+         **************************************************/
+        if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) {
+          writeAuxHeapMods();           
+        } else { 
+          writeMainHeapMods(); 
+        }  // end of processing 1 modified FS
+        iPrevHeap = iHeap;
+      }  // end of for loop over all modified FSs
+    }  // end of method
+    
+    // sort and remove duplicates
+    private void sortModifications() {
+      Arrays.sort(modifiedMainHeapAddrs);
+      Arrays.sort(modifiedFSs);
+      Arrays.sort(modifiedByteHeapAddrs);
+      Arrays.sort(modifiedShortHeapAddrs);
+      Arrays.sort(modifiedLongHeapAddrs);
+    }
+    
+    private int eliminateDuplicatesInMods(final int[] sorted) {
+      int length = sorted.length;
+      if (length < 2) {
+        return length;
+      }
+      
+      int prev = sorted[0];
+      int to = 1;
+      for(int from = 1; from < length; from++) {
+        int s = sorted[from];
+        if (s == prev) {
+          continue;
+        }
+        prev = s;
+        sorted[to] = s;
+        to++;
+      }    
+      return to;  // to is length
+    }
+
+    private int countModifiedSlotsInFs(int fsLength) {
+      return countModifiedSlots(iHeap, fsLength, modifiedMainHeapAddrs, imaModMainHeap, modMainHeapAddrsLength);
+    }
+    
+    private int countModifiedSlotsInAuxHeap(int[] modifiedAddrs, int indexInModAddrs, int length) {
+      return countModifiedSlots(heap[iHeap + 2], heap[iHeap + 1], modifiedAddrs, indexInModAddrs, length);
+    }
+    
+    private int countModifiedSlots(int firstAddr, int length, int[] modifiedAddrs, int indexInModAddrs, int modAddrsLength) {
+      if (0 == length) {
+        throw new RuntimeException();  // can't happen
+      }
+      final int nextAddr = firstAddr + length;
+      int nextModAddr = modifiedAddrs[indexInModAddrs]; 
+      if ((firstAddr > nextModAddr) ||
+          (nextModAddr >= nextAddr)) {
+        throw new RuntimeException(); // never happen - must have one slot at least modified in this fs          
+      }
+      int i = 1;
+      for (;; i++) {
+        if ((indexInModAddrs + i) == modAddrsLength) {
+          break;
+        }
+        nextModAddr = modifiedAddrs[indexInModAddrs + i];
+        if (nextModAddr >= nextAddr) {
+          break;
+        }
+      }
+      return i;
+    }
+    
+    private void writeMainHeapMods() throws IOException {
+      final int fsLength = incrToNextFs(heap, iHeap, typeInfo);
+      final int numberOfModsInFs = countModifiedSlotsInFs(fsLength);
+      writeVnumber(fsIndexes_dos, numberOfModsInFs);
+      int iPrevOffsetInFs = 0;
+
+      for (int i = 0; i < numberOfModsInFs; i++) {
+        final int nextMainHeapIndex = modifiedMainHeapAddrs[imaModMainHeap++];
+        final int offsetInFs = nextMainHeapIndex - iHeap;
+        
+        writeVnumber(fsIndexes_dos, offsetInFs - iPrevOffsetInFs);
+        iPrevOffsetInFs = offsetInFs;
+        
+        final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : offsetInFs);
+        
+        switch (kind) {
+        case Slot_HeapRef:
+          vPrevModHeapRef = writeIntOrHeapRef(heapRef_i, nextMainHeapIndex, vPrevModHeapRef);
+          break;
+        case Slot_Int:
+          vPrevModInt = writeIntOrHeapRef(int_i, nextMainHeapIndex, vPrevModInt);
+          break;
+        case Slot_Short:
+          vPrevModShort = (short)writeIntOrHeapRef(int_i, nextMainHeapIndex, vPrevModShort);             
+          break;
+        case Slot_LongRef:
+          vPrevModLong = writeLongFromHeapIndex(nextMainHeapIndex, vPrevModLong); 
+          break;
+        case Slot_Byte: case Slot_Boolean:
+          byte_dos.write(heap[nextMainHeapIndex]);
+          break;
+        case Slot_Float:
+          writeFloat(heap[nextMainHeapIndex]);
+          break;
+        case Slot_StrRef:
+          writeString(stringHeapObj.getStringForCode(heap[nextMainHeapIndex]));
+          break;
+        case Slot_DoubleRef:
+          writeDouble(longHeapObj.getHeapValue(heap[nextMainHeapIndex]));
+          break;
+        default:
+          throw new RuntimeException();
+        }
+
+      }  // end of looping for all modified slots in this FS
+    }
+    
+    private void writeAuxHeapMods() throws IOException {
+      final int auxHeapIndex = heap[iHeap + 2];
+      int iPrevOffsetInAuxArray = 0;
+      
+      final SlotKind kind = typeInfo.getSlotKind(2);  // get kind of element
+      final boolean isAuxByte = ((kind == Slot_BooleanRef) || (kind == Slot_ByteRef));
+      final boolean isAuxShort = (kind == Slot_ShortRef);
+      final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
+      
+      if (!(isAuxByte | isAuxShort | isAuxLong)) {
+        throw new RuntimeException();  // never happen
+      }
+      
+      final int[] modXxxHeapAddrs = isAuxByte  ? modifiedByteHeapAddrs :
+                                    isAuxShort ? modifiedShortHeapAddrs :
+                                                 modifiedLongHeapAddrs;
+      final int modXxxHeapAddrsLength = isAuxByte  ? modByteHeapAddrsLength :
+                                        isAuxShort ? modShortHeapAddrsLength :
+                                                     modLongHeapAddrsLength;
+      int imaModXxxRef = isAuxByte  ? imaModByteRef :
+                               isAuxShort ? imaModShortRef : 
+                                            imaModLongRef;
+      
+      final int numberOfModsInAuxHeap = countModifiedSlotsInAuxHeap(modXxxHeapAddrs, imaModXxxRef, modXxxHeapAddrsLength);
+      writeVnumber(fsIndexes_dos, numberOfModsInAuxHeap);
+      
+      for (int i = 0; i < numberOfModsInAuxHeap; i++) {
+        final int nextModAuxIndex = modXxxHeapAddrs[imaModXxxRef++];
+        final int offsetInAuxArray = nextModAuxIndex - auxHeapIndex;
+        
+        writeVnumber(fsIndexes_dos, offsetInAuxArray - iPrevOffsetInAuxArray);
+        iPrevOffsetInAuxArray = offsetInAuxArray;
+        
+        if (isAuxByte) {
+          writeUnsignedByte(byte_dos, byteHeapObj.getHeapValue(nextModAuxIndex));
+        } else if (isAuxShort) {
+          final short v = shortHeapObj.getHeapValue(nextModAuxIndex);
+          writeDiff(int_i, v, vPrevModShort);
+          vPrevModShort = v;
+        } else {
+          long v = longHeapObj.getHeapValue(nextModAuxIndex);
+          if (kind == Slot_LongRef) {
+            writeLong(v, vPrevModLong);
+            vPrevModLong = v;    
+          } else {
+            writeDouble(v);
+          }
+        }
+        
+        if (isAuxByte) {
+          imaModByteRef++;
+        } else if (isAuxShort) {
+          imaModShortRef++;
+        } else {
+          imaModLongRef++;
+        }
+        
+      }
+    }
+  } // end of class definition for SerializeModifiedFSs
+          
+  /*************************************************************************************
+   *   D E S E R I A L I Z E
+   *************************************************************************************/   
+
+  public void deserialize(InputStream istream) throws IOException {
+    deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream
+        : new DataInputStream(istream);
+
+    readHeader();
+
+    if (isReadingDelta) {
+      if (null == foundFSs) {
+        throw new UnsupportedOperationException("Deserializing Delta Cas, but original not serialized from");
+      }
+    } else {
+      cas.resetNoQuestions();
+    }
+
+    deserializeAfterVersion(deserIn, isReadingDelta);
+  }
+  
+  public void deserializeAfterVersion(DataInputStream istream, boolean isDelta) throws IOException {
+
+    deserIn = istream;
+    this.isDelta = isReadingDelta = isDelta;
+    setupReadStreams();
+    
+    /************************************************
+     * Read in the common string(s)
+     ************************************************/
+    int lenCmnStrs = readVnumber(strChars_dis);
+    readCommonString = new String[lenCmnStrs];
+    for (int i = 0; i < lenCmnStrs; i++) {
+      readCommonString[i] = DataIO.readUTFv(strChars_dis);
+    }
+    only1CommonString = lenCmnStrs == 1;
+    /***************************
+     * Prepare to walk main heap
+     ***************************/
+    int heapUsedInTarget = readVnumber(control_dis);         
+    final Heap heapObj = cas.getHeap();
+    
+    heapStart = isReadingDelta ? heapObj.getNextId() : 0;
+    stringTableOffset = isReadingDelta ? (stringHeapObj.getSize() - 1) : 0;
+    
+    // This next is just an approximation - it may be too large or too small if type mapping is in effect.
+    // Subsequent code needs to check on each FS if there's enough space left.  If not, it needs to grow.
+    // At end, the next avail position in the heap needs to be set to its actual value,
+    //   in case the amount stored was smaller.
+    if (isReadingDelta) {
+      heapObj.grow(heapUsedInTarget);
+    } else {
+      heapObj.reinitSizeOnly(heapUsedInTarget);
+    } 
+    heap = heapObj.heap;  // because it may change if growing or reinit
+    
+    Arrays.fill(iPrevHeapArray, 0);
+    
+    if (heapStart == 0) {
+      heapStart = 1;  // slot 0 not serialized, it's null / 0
+    }
+
+    // For Delta CAS,
+    //   Reuse previously computed map of addr <--> seq for existing FSs below mark line
+    //                             map of seq(this CAS) <--> seq(incoming) 
+    //                               that accounts for type code mismatch using typeMapper
+    // note: rest of maps computed incrementally as we deserialize
+    //   Two possibilities:  The CAS has a type, but the incoming is missing that type (services)
+    //                       The incoming has a type, but the CAS is missing it - (deser from file)
+    //     Below the merge line: only the 1st is possible
+    //     Above the merge line: only the 2nd is possible
+
+    if (isReadingDelta) {
+      // scan current source being added to / merged into
+      if (fsStartIndexes.getNumberSrcFss() == 1) {
+        throw new IllegalStateException("Reading Delta into CAS not serialized from");
+      }
+    }
+
+    fixupsNeeded = new IntVector(Math.max(16, heap.length / 10));
+
+    /**********************************************************
+     * Read in new FSs being deserialized and add them to heap
+     **********************************************************/
+    for (int iHeap = heapStart, targetHeapUsed = isReadingDelta ? 0 : 1; targetHeapUsed < heapUsedInTarget;) {
+      final int tgtTypeCode = readVnumber(typeCode_dis); // get type code
+      final int srcTypeCode = isTypeMapping ? typeMapper.mapTypeCodeTgt2Src(tgtTypeCode) : tgtTypeCode;
+      final boolean storeIt = (srcTypeCode != 0);
+      if (storeIt) {
+        heap[iHeap] = srcTypeCode;
+      }
+      fsStartIndexes.addSrcAddrForTgt(iHeap, storeIt);
+        // A receiving client from a service always
+        // has a superset of the service's types due to type merging so this
+        // won't happen for that use case. But 
+        // a deserialize-from-file could hit this if the receiving type system
+        // deleted a type.
+      
+        // The strategy for deserializing heap refs depends on finding
+        // the prev value for that type.  This can be skipped for 
+        // types being skipped because they don't exist in the source
+      
+      // typeInfo is Target Type Info
+      typeInfo = isTypeMapping ? tgtTs.getTypeInfo(tgtTypeCode) :
+                                 ts.getTypeInfo(srcTypeCode);
+      final TypeInfo srcTypeInfo = 
+        (!isTypeMapping) ? typeInfo : 
+        storeIt ?       ts.getTypeInfo(srcTypeCode) : 
+                        null;
+      iPrevHeap = iPrevHeapArray[srcTypeCode];  // will be ignored for non-existant type
+
+      if (typeInfo.isHeapStoredArray) {
+        readHeapStoredArray(iHeap, storeIt);
+      } else if (typeInfo.isArray) {
+        readNonHeapStoredArray(iHeap, storeIt);
+      } else {
+        // is normal type with slots
+        if (isTypeMapping && storeIt) {
+          final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(srcTypeCode);
+          for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
+            final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;
+            SlotKind kind = typeInfo.getSlotKind(i+1);
+            readByKind(iHeap, featOffsetInSrc, kind, storeIt);
+          }
+        } else {
+          for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+            SlotKind kind = typeInfo.getSlotKind(i);
+            readByKind(iHeap, i, kind, storeIt);
+          }
+        }
+      }
+      if (storeIt) {
+        iPrevHeapArray[srcTypeCode] = iHeap;  // make this one the "prev" one for subsequent testing
+      }
+//       todo need to incr src heap by amt filtered (in case some slots missing, 
+//                 need to incr tgt (for checking end) by unfiltered amount
+//                 need to fixup final heap to account for skipped slots
+//                 need to have read skip slots not present in src
+      targetHeapUsed += incrToNextFs(heap, iHeap, typeInfo);  // typeInfo is target type info
+      iHeap += storeIt ? incrToNextFs(heap, iHeap, srcTypeInfo) : 0;
+    }
+    
+    final int end = fixupsNeeded.size();
+    for (int i = 0; i < end; i++) {
+      final int heapAddrToFix = fixupsNeeded.get(i);
+      heap[heapAddrToFix] = fsStartIndexes.getSrcAddrFromTgtSeq(heap[heapAddrToFix]);
+    }        
+    
+    readIndexedFeatureStructures();
+
+    if (isReadingDelta) {
+      (new ReadModifiedFSs()).readModifiedFSs();
+    }
+
+    closeDataInputs();
+//      System.out.format("Deserialize took %,d ms%n", System.currentTimeMillis() - startTime1);
+  }
+  
+  private void readNonHeapStoredArray(int iHeap, boolean storeIt) throws IOException {
+    final int length = readArrayLength(iHeap, storeIt);
+    if (length == 0) {
+      return;
+    }
+    SlotKind refKind = typeInfo.getSlotKind(2);
+    switch (refKind) {
+    case Slot_BooleanRef: case Slot_ByteRef:
+      final int byteRef =  readIntoByteArray(length, storeIt);
+      if (storeIt) {
+        heap[iHeap + 2] = byteRef;
+      }
+      break; 
+    case Slot_ShortRef:
+      final int shortRef = readIntoShortArray(length, storeIt);
+      if (storeIt) {
+        heap[iHeap + 2] = shortRef;
+      }
+      break; 
+    case Slot_LongRef: case Slot_DoubleRef:
+      final int longDblRef = readIntoLongArray(refKind, length, storeIt);
+      if (storeIt) {
+        heap[iHeap + 2] = longDblRef;
+      }
+      break; 
+    default:
+      throw new RuntimeException();
+    }
+  }
+  
+  private int readArrayLength(int iHeap, boolean storeIt) throws IOException {
+    final int v =  readVnumber(arrayLength_dis);
+    if (storeIt) {
+      heap[iHeap + 1] = v;
+    }
+    return v;
+  }
+
+  private void readHeapStoredArray(int iHeap, boolean storeIt) throws IOException {
+    final int length = readArrayLength(iHeap, storeIt);
+    // output values
+    // special case 0 and 1st value
+    if (length == 0) {
+      return;
+    }
+    SlotKind arrayElementKind = typeInfo.slotKinds[1];
+    final int endi = iHeap + length + 2;
+    switch (arrayElementKind) {
+    case Slot_HeapRef: case Slot_Int: case Slot_Short:
+      {
+        int prev = (iPrevHeap == 0) ? 0 : 
+                   (heap[iPrevHeap + 1] == 0) ? 0 : // prev array length = 0
+                    heap[iPrevHeap + 2]; // prev array 0th element
+        for (int i = iHeap + 2; i < endi; i++) {
+          final int v = readDiff(arrayElementKind, prev);
+          if (storeIt) {
+            heap[i] = v;
+            if (arrayElementKind == Slot_HeapRef) {
+              fixupsNeeded.add(i);
+            }
+          }
+          prev = v;
+        }
+      }
+      break;
+    case Slot_Float: 
+      for (int i = iHeap + 2; i < endi; i++) {
+        final int floatRef = readFloat();
+        if (storeIt) {
+          heap[i] = floatRef;
+        }
+      }
+      break;
+    case Slot_StrRef:
+      for (int i = iHeap + 2; i < endi; i++) {
+        final int strRef = readString(storeIt); 
+        if (storeIt) {
+          heap[i] = strRef; 
+        }
+      }
+      break;
+      
+    default: throw new RuntimeException("internal error");
+    } // end of switch    
+  }
+  
+  /**
+   *       
+   * @param iHeap
+   * @param offset can be -1 - in which case read, but don't store
+   * @throws IOException
+   */
+  private void readByKind(int iHeap, int offset, SlotKind kind, boolean storeIt) throws IOException {
+    
+    if (offset == -1) {
+      storeIt = false;
+    }
+    switch (kind) {
+    case Slot_Int: case Slot_Short:
+      readDiffWithPrevTypeSlot(kind, iHeap, offset, storeIt);
+      break;
+    case Slot_Float:
+      final int floatAsInt = readFloat();
+      if (storeIt) {
+        heap[iHeap + offset] = floatAsInt;
+      }
+      break;
+    case Slot_Boolean: case Slot_Byte:
+      final byte vByte = byte_dis.readByte();
+      if (storeIt) {
+        heap[iHeap + offset] = vByte;
+      }
+      break;
+    case Slot_HeapRef:
+      readDiffWithPrevTypeSlot(kind, iHeap, offset, storeIt);
+      fixupsNeeded.add(iHeap + offset);
+      break;
+    case Slot_StrRef: 
+      final int vStrRef = readString(storeIt);
+      if (storeIt) {
+        heap[iHeap + offset] = vStrRef;
+      }
+      break;
+    case Slot_LongRef: {
+      long v = readLong(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+      if (v == 0L) {
+        if (longZeroIndex == -1) {
+          longZeroIndex = longHeapObj.addLong(0L);
+        }
+        if (storeIt) {
+          heap[iHeap + offset] = longZeroIndex;
+        }
+      } else {
+        if (storeIt) {
+          heap[iHeap + offset] = longHeapObj.addLong(v);
+        }
+      }
+      break;
+    }
+    case Slot_DoubleRef: {
+      long v = readDouble();
+      if (v == 0L) {
+        if (longZeroIndex == -1) {
+          longZeroIndex = longHeapObj.addLong(0L);
+        }
+        if (storeIt) {
+          heap[iHeap + offset] = longZeroIndex;
+        }
+      } else if (v == DBL_1) {
+        if (double1Index == -1) {
+          double1Index = longHeapObj.addLong(DBL_1);
+        }
+        if (storeIt) {
+          heap[iHeap + offset] = double1Index;
+        }
+      } else {
+        if (storeIt) {
+          heap[iHeap + offset] = longHeapObj.addLong(v);
+        }
+      }
+      break;
+    }
+    default: 
+      throw new RuntimeException("internal error");                
+    } // end of switch
+  }
+
+  private void readIndexedFeatureStructures() throws IOException {
+    final int nbrViews = readVnumber(control_dis);
+    final int nbrSofas = readVnumber(control_dis);
+
+    IntVector fsIndexes = new IntVector(nbrViews + nbrSofas + 100);
+    fsIndexes.add(nbrViews);
+    fsIndexes.add(nbrSofas);
+    for (int i = 0; i < nbrSofas; i++) {
+      fsIndexes.add(readVnumber(control_dis));
+    }
+      
+    for (int i = 0; i < nbrViews; i++) {
+      readFsxPart(fsIndexes);     // added FSs
+      if (isDelta) {
+        readFsxPart(fsIndexes);   // removed FSs
+        readFsxPart(fsIndexes);   // reindexed FSs
+      }
+    }
+    
+    if (isDelta) {
+      // getArray avoids copying.
+      // length is too long, but extra is never accessed
+      cas.reinitDeltaIndexedFSs(fsIndexes.getArray());
+    } else {
+      cas.reinitIndexedFSs(fsIndexes.getArray());
+    }
+  }
+
+  /**
+   * Each FS index is sorted, and output is by delta 
+   */
+  private void readFsxPart(IntVector fsIndexes) throws IOException {
+    final int nbrEntries = readVnumber(control_dis);
+    int nbrEntriesAdded = 0;
+    final int indexOfNbrAdded = fsIndexes.size();
+    fsIndexes.add(0);  // a place holder, will be updated at end      
+    int prev = 0;
+    
+    for (int i = 0; i < nbrEntries; i++) {
+      int v = readVnumber(fsIndexes_dis) + prev;
+      prev = v;
+      v = fsStartIndexes.getSrcAddrFromTgtSeq(v);
+      if (v > 0) {  // if not, no src type for this type in tgtTs
+        nbrEntriesAdded++;
+        fsIndexes.add(v);
+      }
+    }
+    fsIndexes.set(indexOfNbrAdded, nbrEntriesAdded);
+  } 
+
+  
+  private DataInput getInputStream(SlotKind kind) {
+    return dataInputs[kind.ordinal()];
+  }
+
+  private int readVnumber(DataInputStream dis) throws IOException {
+    return DataIO.readVnumber(dis);
+  }
+
+  private long readVlong(DataInputStream dis) throws IOException {
+    return DataIO.readVlong(dis);
+  }
+
+  private int readIntoByteArray(int length, boolean storeIt) throws IOException { 
+    if (storeIt) {
+      final int startPos = byteHeapObj.reserve(length);
+      byte_dis.readFully(byteHeapObj.heap, startPos, length);
+      return startPos;
+    } else {
+      byte_dis.skipBytes(length);
+      return 0;
+    }
+  }
+
+  private int readIntoShortArray(int length, boolean storeIt) throws IOException {
+    if (storeIt) {
+      final int startPos = shortHeapObj.reserve(length);
+      final short[] h = shortHeapObj.heap;
+      final int endPos = startPos + length;
+      short prev = 0;
+      for (int i = startPos; i < endPos; i++) {
+        h[i] = prev = (short)(readDiff(short_dis, prev));
+      }
+      return startPos;
+    } else {
+      short_dis.skipBytes(length * 2);
+      return 0;
+    }
+  }
+  
+  private int readIntoLongArray(SlotKind kind, int length, boolean storeIt) throws IOException {
+    if (storeIt) {
+      final int startPos = longHeapObj.reserve(length);
+      final long[] h = longHeapObj.heap;
+      final int endPos = startPos + length;
+      long prev = 0;
+      for (int i = startPos; i < endPos; i++) {
+        h[i] = prev = readLong(kind, prev);
+      }
+      return startPos;
+    } else {
+      skipLong(length);
+      return 0;
+    }
+  }
+
+  private void readDiffWithPrevTypeSlot(
+      SlotKind kind, 
+      int iHeap, 
+      int offset,
+      boolean storeIt) throws IOException {
+    if (storeIt) {
+      int prev = (iPrevHeap == 0) ? 0 : heap[iPrevHeap + offset];
+      heap[iHeap + offset] = readDiff(kind, prev);
+    } else {
+      readDiff(kind, 0);
+    }
+  }
+
+  private int readDiff(SlotKind kind, int prev) throws IOException {
+    return readDiff(getInputStream(kind), prev);
+  }
+  
+  private int readDiff(DataInput in, int prev) throws IOException {
+    final long encoded = readVlong(in);
+    final boolean isDeltaEncoded = (0 != (encoded & 1L));
+    final boolean isNegative = (0 != (encoded & 2L));
+    int v = (int)(encoded >>> 2);
+    if (isNegative) {
+      if (v == 0) {
+        return Integer.MIN_VALUE;
+      }
+      v = -v;
+    }
+    if (isDeltaEncoded) {
+      v = v + prev;
+    }
+    return v;
+
+  }
+      
+  private long readLong(SlotKind kind, long prev) throws IOException {
+    if (kind == Slot_DoubleRef) {
+      return readDouble();
+    }
+  
+    final int vh = readDiff(long_High_dis, (int) (prev >>> 32));
+    final int vl = readDiff(long_Low_dis, (int) prev);
+    final long v = (((long)vh) << 32) | (0xffffffffL & (long)vl);
+    return v;
+  }
+  
+  private void skipLong(int length) throws IOException {
+    for (int i = 0; i < length; i++) {
+      long_High_dis.skipBytes(8);
+      long_Low_dis.skipBytes(8);
+    }
+  }
+     
+  private int readFloat() throws IOException {
+    final int exponent = readVnumber(float_Exponent_dis);  
+    if (exponent == 0) {
+      return 0;
+    }
+    int mants = readVnumber(float_Mantissa_Sign_dis);
+    final boolean isNegative = (mants & 1) == 1;
+    mants = mants >>> 1;
+    // the next parens needed to get around eclipse / java bug
+    mants = (Integer.reverse(mants) >>> 9);
+        
+    return ((exponent - 1) << 23) |
+           mants | 
+           ((isNegative) ? 0x80000000 : 0);        
+  }
+     
+  private int decodeIntSign(int v) {
+    if (1 == (v & 1)) {
+      return - (v >>> 1);
+    }
+    return v >>> 1;
+  }
+  
+  private long readDouble() throws IOException {
+    int exponent = readVnumber(double_Exponent_dis);
+    if (exponent == 0) {
+      return 0L;
+    }
+    long mants = readVlong(double_Mantissa_Sign_dis);
+    return decodeDouble(mants, exponent);
+  }
+  
+  private long decodeDouble(long mants, int exponent) {
+    exponent = decodeIntSign(exponent);
+    if (exponent > 0) {
+      exponent --;  
+    }
+    exponent = exponent + 1023; 
+    long r = ((long)((exponent) & 0x7ff)) << 52;
+    final boolean isNegative = (1 == (mants & 1));
+    mants = Long.reverse(mants >>> 1) >>> 12;
+    r = r | mants | (isNegative ? 0x8000000000000000L : 0);
+    return r;
+  }
+          
+  private long readVlong(DataInput dis) throws IOException {
+    return DataIO.readVlong(dis);
+  }
+    
+  private int readString(boolean storeIt) throws IOException {
+    int length = decodeIntSign(readVnumber(strLength_dis));
+    if (0 == length) {
+      return 0;
+    }
+    if (1 == length) {
+      if (storeIt) {
+        return stringHeapObj.addString("");
+      } else {
+        return 0;
+      }
+    }
+    
+    if (length < 0) {  // in this case, -length is the slot index
+      if (storeIt) {
+        return stringTableOffset - length;
+      } else {
+        return 0;
+      }
+    }
+    int offset = readVnumber(strOffset_dis);
+    int segmentIndex = (only1CommonString) ? 0 :
+      readVnumber(strSeg_dis);
+    if (storeIt) {
+      String s =  readCommonString[segmentIndex].substring(offset, offset + length - 1);
+      return stringHeapObj.addString(s);
+    } else {
+      return 0;
+    }
+  }
+
+  /******************************************************************************
+   * Modified Values
+   * 
+   * Modified heap values need fsStartIndexes conversion
+   ******************************************************************************/
+
+  class ReadModifiedFSs {
+    
+    // previous value - for things diff encoded
+    int vPrevModInt = 0;
+    int prevModHeapRefTgtSeq = 0;
+    short vPrevModShort = 0;
+    long vPrevModLong = 0;
+    int iHeap;
+    TypeInfo typeInfo;
+    int[] tgtF2srcF;
+    
+    // for handling aux heaps with type mapping which may skip some things in the target
+    //   An amount that needs to be added to the offset from target to account for
+    //   source types and features not in the target.
+    //
+    // Because this is only done for Delta CAS, it is guaranteed that the 
+    //   target cannot contain types or features that are not in the source
+    //   (due to type merging)
+//      int[] srcHeapIndexOffset; 
+//      
+//      Iterator<AuxSkip>[] srcSkipIt;  // iterator over skip points
+//      AuxSkip[] srcNextSkipped;  // next skipped
+//      int[] srcNextSkippedIndex;
+
+    private void readModifiedFSs() throws IOException {
+      final int modFSsLength = readVnumber(control_dis);
+      int prevSeq = 0;
+      
+//        if (isTypeMapping) {
+//          for (int i = 0; i < AuxHeapsCount; i++) {
+//            srcHeapIndexOffset[i] = 0;
+//            srcSkipIt[i] = fsStartIndexes.skips.get(i).iterator();
+//            srcNextSkipped[i] = (srcSkipIt[i].hasNext()) ? srcSkipIt[i].next() : null;
+//            srcNextSkippedIndex[i] = (srcNextSkipped[i] == null) ? Integer.MAX_VALUE : srcNextSkipped[i].skipIndex;
+//          }
+//        }
+               
+      for (int i = 0; i < modFSsLength; i++) {
+        final int seqNbrModified = readDiff(fsIndexes_dis, prevSeq);
+//          iHeap = readVnumber(fsIndexes_dis) + iPrevHeap;
+        prevSeq = seqNbrModified;
+//          iPrevHeap = iHeap;
+
+        iHeap = fsStartIndexes.getSrcAddrFromTgtSeq(seqNbrModified);
+        if (iHeap < 1) {
+          // never happen because delta CAS ts system case, the 
+          //   target is always a subset of the source
+          //   due to type system merging
+          throw new RuntimeException("never happen");
+        }
+        final int tCode = heap[iHeap];
+        typeInfo = ts.getTypeInfo(tCode);
+        if (isTypeMapping) {
+          tgtF2srcF = typeMapper.getTgtFeatOffsets2Src(tCode);
+        }
+        
+        final int numberOfModsInThisFs = readVnumber(fsIndexes_dis); 
+
+        if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) { 
+          /**************************************************
+           * handle aux byte, short, long array modifications
+           *   Note: boolean stored in byte array
+           *   Note: strings are heap-store-arrays
+           **************************************************/
+           readModifiedAuxHeap(numberOfModsInThisFs);
+        } else {
+          readModifiedMainHeap(numberOfModsInThisFs);
+        }
+      }
+    }
+    
+    // update the byte/short/long aux heap entries
+    // for arrays
+    /**
+     * update the byte/short/long aux heap entries
+     * Only called for arrays
+     * No aux heap offset adjustments needed since we get
+     *   the accuract source start point from the source heap
+     */
+    private void readModifiedAuxHeap(int numberOfMods) throws IOException {
+      int prevOffset = 0;      
+      final int auxHeapIndex = heap[iHeap + 2];
+      final SlotKind kind = typeInfo.getSlotKind(2);  // get kind of element
+      final boolean isAuxByte = ((kind == Slot_BooleanRef) || (kind == Slot_ByteRef));
+      final boolean isAuxShort = (kind == Slot_ShortRef);
+      final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
+      if (!(isAuxByte | isAuxShort | isAuxLong)) {
+        throw new RuntimeException();  // never happen
+      }
+      
+      for (int i2 = 0; i2 < numberOfMods; i2++) {
+        final int offset = readVnumber(fsIndexes_dis) + prevOffset;
+        prevOffset = offset;
+        
+        if (isAuxByte) {
+          byteHeapObj.setHeapValue(byte_dis.readByte(), auxHeapIndex + offset);
+        } else if (isAuxShort) {
+          final short v = (short)readDiff(int_dis, vPrevModShort);
+          vPrevModShort = v;
+          shortHeapObj.setHeapValue(v, auxHeapIndex + offset);
+        } else {
+          final long v = readLong(kind, vPrevModLong);
+          if (kind == Slot_LongRef) {
+            vPrevModLong = v;
+          }
+          longHeapObj.setHeapValue(v, auxHeapIndex + offset);
+        }    
+      }
+    }
+    
+    private void readModifiedMainHeap(int numberOfMods) throws IOException {
+      int iPrevTgtOffsetInFs = 0;
+      for (int i = 0; i < numberOfMods; i++) {
+        final int tgtOffsetInFs = readVnumber(fsIndexes_dis) + iPrevTgtOffsetInFs;
+        iPrevTgtOffsetInFs = tgtOffsetInFs;
+        final int srcOffsetInFs = isTypeMapping ? tgtF2srcF[tgtOffsetInFs] : tgtOffsetInFs;
+        if (srcOffsetInFs < 0) {
+          // never happen because if type mapping, and delta cas being deserialized,
+          //   all of the target features would have been merged into the source ones.
+          throw new RuntimeException();
+        }
+        final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : srcOffsetInFs);
+        
+        switch (kind) {
+        case Slot_HeapRef: {
+            final int tgtSeq = readDiff(heapRef_dis, prevModHeapRefTgtSeq);
+            prevModHeapRefTgtSeq = tgtSeq;
+            final int v = fsStartIndexes.getSrcAddrFromTgtSeq(tgtSeq);
+            heap[iHeap + srcOffsetInFs] = v;
+          }
+          break;
+        case Slot_Int: {
+            final int v = readDiff(int_dis, vPrevModInt);
+            vPrevModInt = v;
+            heap[iHeap + srcOffsetInFs] = v;
+          }
+          break;
+        case Slot_Short: {
+            final int v = readDiff(int_dis, vPrevModShort);
+            vPrevModShort = (short)v;
+            heap[iHeap + srcOffsetInFs] = v;
+          }
+          break;
+        case Slot_LongRef: case Slot_DoubleRef: {
+            final long v = readLong(kind, vPrevModLong);
+            if (kind == Slot_LongRef) {
+              vPrevModLong = v;
+            }
+            heap[iHeap + srcOffsetInFs] = longHeapObj.addLong(v);
+          }
+          break;
+        case Slot_Byte: case Slot_Boolean:
+          heap[iHeap + tgtOffsetInFs] = byte_dis.readByte();
+          break;
+        case Slot_Float:
+          heap[iHeap + tgtOffsetInFs] = readFloat();
+          break;
+        case Slot_StrRef:
+          heap[iHeap + tgtOffsetInFs] = readString(true);
+          break;
+       default:
+          throw new RuntimeException();
+        }
+      }   
+    }
+  }
+  
+
+  /********************************************************************
+   * methods common to serialization / deserialization etc.
+   ********************************************************************/
+  
+  
+  private static int incrToNextFs(int[] heap, int iHeap, TypeInfo typeInfo) {
+    if (typeInfo.isHeapStoredArray) {
+      return 2 + heap[iHeap + 1];
+    } else {
+      return 1 + typeInfo.slotKinds.length;
+    }
+  }
+
+  /**
+   * This routine uses the same "scanning" to do two completely different things:
+   *   The first thing is to generate an ordered set (by heap addr) 
+   *   of all FSs that are to be serialized:
+   *     because they are in some index, or
+   *     are pointed to by something that is in some index (recursively)
+   *   
+   *   The second thing is to serialize out the index information.
+   *   This step has to wait until the first time call has completed and 
+   *   the fsStartIndexes instance has a chance to be built.
+   * 
+   * The cas is passed in so that the Compare can use this for two different CASes
+   * 
+   * @throws IOException
+   */
+  private void processIndexedFeatureStructures(CASImpl cas, boolean isWrite) throws IOException {
+    if (!isWrite) {
+      foundFSs = new IntArrayRBT();
+    }
+    final int[] fsIndexes = isSerializingDelta ? cas.getDeltaIndexedFSs(mark) : cas.getIndexedFSs();
+    final int nbrViews = fsIndexes[0];
+    final int nbrSofas = fsIndexes[1];
+
+    if (isWrite) {
+      if (doMeasurements) {
+        sm.statDetails[fsIndexes_i].original = fsIndexes.length * 4 + 1;      
+      }
+      writeVnumber(control_i, nbrViews);
+      writeVnumber(control_i, nbrSofas);
+      if (doMeasurements) {
+        sm.statDetails[fsIndexes_i].incr(1); // an approximation - probably correct
+        sm.statDetails[fsIndexes_i].incr(1);
+      }
+    }
+        
+    int fi = 2;
+    final int end1 = nbrSofas + 2;
+    for (; fi < end1; fi++) {
+//      writeVnumber(control_i, fsIndexes[fi]);  // version 0
+      final int addrSofaFs = fsIndexes[fi];
+      if (isWrite) {
+        final int v = fsStartIndexes.getTgtSeqFromSrcAddr(addrSofaFs);
+        writeVnumber(control_i, v);    // version 1
+         
+        if (doMeasurements) {
+          sm.statDetails[fsIndexes_i].incr(DataIO.lengthVnumber(v));
+        }
+      } else {
+        enqueueFS(foundFSs, addrSofaFs);
+      }
+    }
+     
+    for (int vi = 0; vi < nbrViews; vi++) {
+      fi = processFsxPart(fsIndexes, fi, foundFSs, isWrite);    // added FSs
+      if (isWrite && isSerializingDelta) {
+        fi = processFsxPart(fsIndexes, fi, null, false);  // removed FSs
+        fi = processFsxPart(fsIndexes, fi, null, false);  // reindexed FSs
+      }
+    } 
+    return;
+  }
+
+  private int processFsxPart(
+      int[] fsIndexes, 
+      int fsNdxStart,
+      IntArrayRBT foundFSs, 
+      boolean isWrite) throws IOException {
+    int ix = fsNdxStart;
+    final int nbrEntries = fsIndexes[ix++];
+    final int end = ix + nbrEntries;
+    // version 0
+//      writeVnumber(fsIndexes_dos, nbrEntries);  // number of entries
+    //version 1: the list is filtered by the tgt type, and may be smaller;
+    //  it is written at the end, into the control_dos stream
+//      if (doMeasurements) {
+//        sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(nbrEntries));
+//      }
+    
+    final int[] ia = new int[nbrEntries];
+    // Arrays are sorted, because order doesn't matter to the logic, but
+    // sorted arrays can be compressed via diff encoding better
+    System.arraycopy(fsIndexes, ix, ia, 0, nbrEntries);
+    Arrays.sort(ia);
+   
+    int prev = 0;
+    int entriesWritten = 0;  // can be less than nbrEntries if type mapping excludes some types in target
+    
+    for (int i = 0; i < ia.length; i++) {
+      final int fsAddr = ia[i];
+      // skip if not in target
+//        if (!isTypeMapping || (0 != typeMapper.mapTypeCodeSrc2Tgt(heap[fsAddr]))) {
+      if (isWrite) {
+        final int tgtV = fsStartIndexes.getTgtSeqFromSrcAddr(fsAddr);
+        if (tgtV == -1) {
+          continue; // skip - the target doesn't have this Fs
+        }
+        final int delta = tgtV - prev;
+        entriesWritten++;
+        writeVnumber(fsIndexes_dos, delta);
+        if (doMeasurements) {
+          sm.statDetails[fsIndexes_i].incr(DataIO.lengthVnumber(delta));
+        }
+        prev = tgtV;
+      } else {    
+        enqueueFS(foundFSs, fsAddr);
+      }
+    }
+    if (isWrite) {
+      writeVnumber(control_dos, entriesWritten);  // version 1  
+      if (doMeasurements) {
+        sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(entriesWritten));
+      }
+    }
+    return end;
+  } 
+
+  private void enqueueFS(IntArrayRBT foundFSs, int fsAddr) {
+    if (null == foundFSs) {
+      return;
+    }
+    if (0 != fsAddr) {
+      if (!foundFSs.containsKey(fsAddr)) {
+        if (!isSerializingDelta || mark.isNew(fsAddr)) {
+          foundFSs.insertKey(fsAddr);
+          enqueueFeatures(foundFSs, fsAddr);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Enqueue all FSs reachable from features of the given FS.
+   */
+  private void enqueueFeatures(IntArrayRBT foundFSs, int addr) {
+    final int tCode = heap[addr];
+    final TypeInfo typeInfo = ts.getTypeInfo(tCode);
+    final SlotKind[] kinds = typeInfo.slotKinds;
+    
+    if (typeInfo.isHeapStoredArray && (Slot_HeapRef == kinds[1])) {
+      // fs array, add elements
+      final int length = heap[addr + 1];
+      for (int i = 0; i < length; i++) {
+        enqueueFS(foundFSs, heap[addr + 2 + i]);
+      }
+      return;
+    }
+    
+    // not an FS Array
+    if (typeInfo.isArray) {
+      return;
+    }
+  
+    if (isTypeMapping) {
+      final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode);
+      for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
+        final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;  // add one for origin 1
+        if (featOffsetInSrc == 0) {
+          throw new RuntimeException(); // never happen because for serialization, target is never a superset of features of src
+        }
+        if (kinds[featOffsetInSrc - 1] == Slot_HeapRef) {
+          enqueueFS(foundFSs, heap[addr + featOffsetInSrc]);
+        }
+      }
+    } else {
+      for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+        if (kinds[i - 1] == Slot_HeapRef) {
+          enqueueFS(foundFSs, heap[addr + i]);
+        }
+      }
+    }
+  }
+  
+  
+  /**
+   * Serializing:
+   *   Called at beginning, scans whole CAS
+   * Deserializing (Delta Cas only):
+   *   Called at beginning if doing delta CAS, scans old CAS up to mark
+   * @param fsStartIndexes
+   * @param srcHeap
+   * @param srcHeapStart
+   * @param srcHeapEnd
+   * @param histo
+   * @param optimizeStrings
+   * @param stringHeapObj
+   * @return amount of heap used in target, side effect: set up fsStartIndexes (for both src and tgt)
+   */
+  private int initFsStartIndexes () {
+   
+    final boolean isTypeMapping = isTypeMappingCmn;
+    final CasTypeSystemMapper typeMapper = typeMapperCmn;
+    final IntListIterator foundFSsIterator = foundFSs.iterator();
+    
+    int tgtHeapUsed = 0;
+    int nextTgtHeap = 1;
+    int markStringHeap = (isDelta) ? mark.getNextStringHeapAddr() : 0;
+    
+    while (foundFSsIterator.hasNext()) {
+      final int iSrcHeap = foundFSsIterator.next();
+      final int iTgtHeap = nextTgtHeap;
+      final int tCode = heap[iSrcHeap];
+      final int tgtTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
+      final boolean isIncludedType = (tgtTypeCode != 0);
+      
+      // record info for type
+      fsStartIndexes.addItemAddr(iSrcHeap, iTgtHeap, isIncludedType);  // maps src heap to tgt seq
+      
+      // for features in type - 
+      //    strings: accumulate those strings that are in the target, if optimizeStrings != null
+      //      strings either in array, or in individual values
+      //    byte (array), short (array), long/double (instance or array): record if entries in aux array are skipped
+      //      (not in the target).  Note the recording will be in a non-ordered manner (due to possible updates by
+      //       previous delta deserialization)
+      final TypeInfo srcTypeInfo = ts.getTypeInfo(tCode);
+      final TypeInfo tgtTypeInfo = (isTypeMapping && isIncludedType) ? 
+          typeMapper.tsTgt.getTypeInfo(tgtTypeCode) : 
+          srcTypeInfo;
+      
+      
+      // add strings for included types (only when serializing)
+      if (isIncludedType && (os != null)) { 
+
+        // next test only true if tgtTypeInfo.slotKinds[1] == Slot_StrRef
+        // because this is the built-in type string array which is final
+        if (srcTypeInfo.isHeapStoredArray && (srcTypeInfo.slotKinds[1] == Slot_StrRef)) {
+          for (int i = 0; i < heap[iSrcHeap + 1]; i++) {
+            // this bit of strange logic depends on the fact that all new and updated strings
+            // are "added" at the end of the string heap in the current impl
+            final int strHeapIndex = heap[iSrcHeap + 2 + i];
+            if (strHeapIndex >= markStringHeap) {
+              os.add(stringHeapObj.getStringForCode(strHeapIndex));
+            }
+          }
+        } else {
+          final int[] strOffsets = srcTypeInfo.strRefOffsets;
+          final boolean[] fSrcInTgt = isTypeMapping ? typeMapper.getFSrcInTgt(tCode) : null;
+          for (int i = 0; i < strOffsets.length; i++ ) {
+            int srcOffset = strOffsets[i];  // offset to slot having str ref
+            // add only those strings in slots that are in target
+            if (!isTypeMapping || fSrcInTgt[srcOffset]) {
+              final int strHeapIndex = heap[iSrcHeap + strOffsets[i]];
+              // this bit of strange logic depends on the fact that all new and updated strings
+              // are "added" at the end of the string heap in the current impl
+              if (strHeapIndex >= markStringHeap) {
+                os.add(stringHeapObj.getStringForCode(strHeapIndex));
+              }
+            }
+          }
+        }
+      }
+            
+      // Advance to next Feature Structure, in both source and target heap frame of reference
+      if (isIncludedType) {
+        final int deltaTgtHeap = incrToNextFs(heap, iSrcHeap, tgtTypeInfo);
+        nextTgtHeap += deltaTgtHeap;
+        if (iSrcHeap >= heapStart) {  // don't use up tgt heap if delta, and below the mark
+          tgtHeapUsed += deltaTgtHeap;
+        }
+      }
+    }
+    
+    return tgtHeapUsed;  // side effect: set up fsStartIndexes
+  } 
+
+
+  /**
+   * Compare 2 CASes, with perhaps different type systems.
+   * If the type systems are different, construct a type mapper and use that
+   *   to selectively ignore types or features not in other type system
+   * 
+   * Compare only feature structures reachable via indexes or refs
+   *   The order must match
+   * 
+   * @param c1 CAS to compare
+   * @param c2 CAS to compare
+   * @return true if equal (for types / features in both)
+   */
+  public static boolean compareCASes(CASImpl c1, CASImpl c2) {
+    BinaryCasSerDes6 containingInstance = new BinaryCasSerDes6(c1);
+    return (containingInstance.new CasCompare(c1, c2)).compareCASes();
+  }
+  
+  public class CasCompare {
+    /** 
+     * Compare 2 CASes for equal
+     * The layout of refs to aux heaps does not have to match
+     */
+      final private CASImpl c1;
+      final private CASImpl c2;
+      final private TypeSystemImpl ts1;      
+      final private TypeSystemImpl ts2;
+      final private boolean isTypeMapping;
+      final private CasTypeSystemMapper typeMapper;
+      final private Heap c1HO;
+      final private Heap c2HO;
+      final private int[] c1heap;
+      final private int[] c2heap;
+      
+      private TypeInfo typeInfo;
+      private int c1heapIndex;
+      private int c2heapIndex;
+      
+      private IntRedBlackTree addr2seq1 = new IntRedBlackTree();
+      private IntRedBlackTree addr2seq2 = new IntRedBlackTree();
+            
+    public CasCompare(CASImpl c1, CASImpl c2) {
+      this.c1 = c1;
+      this.c2 = c2;
+      ts1 = c1.getTypeSystemImpl();
+      ts2 = c2.getTypeSystemImpl();
+      isTypeMapping = (ts1 != ts2);
+      typeMapper = isTypeMapping ? new CasTypeSystemMapper(ts1, ts2) : null;
+      c1HO = c1.getHeap();
+      c2HO = c2.getHeap();
+      c1heap = c1HO.heap;
+      c2heap = c2HO.heap;      
+    }
+      
+    public boolean compareCASes() {
+      IntArrayRBT c1FoundFSs;
+      IntArrayRBT c2FoundFSs;
+      try {
+        processIndexedFeatureStructures(c1, false);
+        c1FoundFSs = foundFSs;
+        foundFSs = new IntArrayRBT();
+        processIndexedFeatureStructures(c2, false);
+        c2FoundFSs = foundFSs;
+      } catch (IOException e) {
+        throw new RuntimeException(e);  // never happen
+      }
+      IntPointerIterator c1Iterator = c1FoundFSs.pointerIterator();
+      IntPointerIterator c2Iterator = c2FoundFSs.pointerIterator();
+
+      int i = 0;
+      while (c1Iterator.isValid()) {
+        addr2seq1.put(c1Iterator.get(), i++);
+        c1Iterator.inc();
+      }
+      i = 0;
+      while (c2Iterator.isValid()) {
+        addr2seq2.put(c2Iterator.get(), i++);
+        c2Iterator.inc();
+      }
+      
+      c1Iterator.moveToFirst();
+      c2Iterator.moveToFirst();
+      
+//      initFsStartIndexesCompare();
+      
+      // Iterating over both CASes
+      //   If c1 is past end, verify all c2 instances up to its end are not in c1
+      //   If c2 is past end, verify all c1 instances up to its end are not in c1
+      //   If c1's instance type exists in c2, compare & advance both iterators
+      //   If c1's instance type doesn't exist in c2, advance c1 and continue
+      //   If c2's instance type doesn't exist in c1, advance c2 and continue
+      
+      
+//      final int endHeapSeqSrc = fsStartIndexes.getNbrOfItems();
+//      c1heapIndex = 1;
+//      c2heapIndex = 1;
+//      boolean pastEnd1 = false;
+//      boolean pastEnd2 = false;      
+      
+      while (c1Iterator.isValid() && c2Iterator.isValid()) {
+        c1heapIndex = c1Iterator.get();
+        c2heapIndex = c2Iterator.get();

[... 617 lines stripped ...]