You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2012/11/06 19:22:04 UTC

svn commit: r1406261 [2/4] - in /uima/uimaj/trunk/uimaj-core/src: main/java/org/apache/uima/cas/ main/java/org/apache/uima/cas/impl/ main/java/org/apache/uima/util/ main/java/org/apache/uima/util/impl/ main/resources/org/apache/uima/ test/java/org/apac...

Added: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes4.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes4.java?rev=1406261&view=auto
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes4.java (added)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes4.java Tue Nov  6 18:22:03 2012
@@ -0,0 +1,2771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.cas.impl;
+
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.NBR_SLOT_KIND_ZIP_STREAMS;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_ArrayLength;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Boolean;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_BooleanRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Byte;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_ByteRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Control;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_DoubleRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Double_Exponent;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Double_Mantissa_Sign;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Float;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Float_Exponent;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Float_Mantissa_Sign;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_FsIndexes;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_HeapRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Int;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_LongRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Long_High;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Long_Low;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_MainHeap;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_Short;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_ShortRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_StrChars;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_StrLength;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_StrOffset;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_StrRef;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_StrSeg;
+import static org.apache.uima.cas.impl.BinaryCasSerDes4.SlotKind.Slot_TypeCode;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.uima.cas.AbstractCas;
+import org.apache.uima.cas.CASRuntimeException;
+import org.apache.uima.cas.Feature;
+import org.apache.uima.cas.Marker;
+import org.apache.uima.internal.util.IntVector;
+import org.apache.uima.jcas.JCas;
+import org.apache.uima.util.SerializationMeasures;
+import org.apache.uima.util.impl.DataIO;
+import org.apache.uima.util.impl.OptimizeStrings;
+
+/**
+ * User callable serialization and deserialization of the CAS in a compressed Binary Format
+ * 
+ * This serializes/deserializes the state of the CAS, assuming that the type
+ * information remains constant.
+ * 
+ * Header specifies to reader the format, and the compression level.
+ * 
+ * How to Serialize:  
+ * 
+ * 1) create an instance of this class, specifying some options that don't change very much
+ * 2) call serialize(CAS) to serialize the cas * 
+ * 
+ * You can reuse the instance for a different CAS (as long as the type system is the same);
+ * this will save setup time.
+ * 
+ * This class lazily constructs customized TypeInfo instances for each type encountered in serializing.
+ * These are preserved across multiple serialization calls, so their setup / initialization is only
+ * needed the first time.
+ * 
+ * The form of the binary CAS is inserted at the beginning so that receivers can do the
+ * proper deserialization.  
+ *     
+ * Binary format requires that the exact same type system be used when deserializing 
+ * 
+ * How to Deserialize:
+ * 
+ * 1) get an appropriate CAS to deserialize into.  For delta CAS, it does not have to be empty.
+ * 2) call CASImpl: cas.reinit(inputStream)  This is the existing method
+ *    for binary deserialization, and it now handles this compressed version, too.
+ *    Delta cas is also supported.
+ * 
+ * Compression/Decompression
+ * Works in two stages:
+ *   application of Zip/Unzip to particular sub-collections of CAS data, 
+ *     grouped according to similar data distribution
+ *   collection of like kinds of data (to make the zipping more effective)
+ *   There can be up to ~20 of these collections, such as
+ *      control info, float-exponents, string chars
+ * Deserialization:
+ *   Read all bytes, 
+ *   create separate ByteArrayInputStreams for each segment, sharing byte bfr
+ *   create appropriate unzip data input streams for these
+ */
+public class BinaryCasSerDes4 {
+  
+  public static final int TYPECODE_COMPR = 8;
+  public static final boolean CHANGE_FS_REFS_TO_SEQUENTIAL = true;
+  // may add more later - to specify differing trade-offs between speed and compression
+  public enum Compression {None, Compress};  
+  public static final boolean IS_DIFF_ENCODE = true;
+  public static final boolean CAN_BE_NEGATIVE = true;
+  public static final boolean IGNORED = true;
+  public static final boolean IN_MAIN_HEAP = true;
+  
+  private static final long DBL_1 = Double.doubleToLongBits(1D);
+
+  /**
+   * The kinds of slots that can exist 
+   *   an index for getting type-code specific values, 
+   *   flag - whether or not they should be diff encoded
+   *   flag - if they can be negative (and need their sign moved)
+   *   
+   * Some are real slots in the heap; others are descriptions of
+   *   parts of values, eg. float exponent
+   *   
+   * Difference encoding costs 1 bit.  
+   *   Measurements show it can lessen zip's effectiveness 
+   *     (especially for single byte values (?)),
+   *     probably because it causes more dispersion in 
+   *     the value kinds.  
+   *   Because of this 2-fold cost (1 bit and less zip),
+   *     differencing being tried only for multi-byte 
+   *     values (short, int, long), and heap refs
+   *     - for array values, diff is with prev array value
+   *       (for 1st value in array, diff is with prev FeatureStructure
+   *       of the same type in the heap's 1st value if it exists
+   *     - for non-array values or 1st array value, diff is with
+   *       prev heap value for same type in heap  
+   * 
+   *   Not done for float parts - exponent too short, and
+   *     mantissa too random.
+   * 
+   * CanBeNegative
+   *   Many values are only positive e.g., array lengths
+   *   Some values can be negative
+   *     (all difference-encoded things can be negative)
+   *   Represent as 1 bit + positive number, sign bit in 
+   *     least sig. bit position.  This allows the
+   *     bits to cluster closer to 0 on the positive side,
+   *     which can make for fewer bytes to represent the number.
+   */
+
+  /**
+   * Compression alternatives
+   */
+  
+  public enum CompressLevel {
+    None(   Deflater.NO_COMPRESSION),
+    Fast(   Deflater.BEST_SPEED),
+    Default(Deflater.DEFAULT_COMPRESSION),
+    Best(   Deflater.BEST_COMPRESSION),
+    ;
+    final public int lvl;
+    CompressLevel(int lvl) {
+      this.lvl = lvl;
+    }
+  }
+  
+  public enum CompressStrat {
+    Default(      Deflater.DEFAULT_STRATEGY),
+    Filtered(     Deflater.FILTERED),
+    HuffmanOnly(  Deflater.HUFFMAN_ONLY),
+    ;
+    final public int strat;
+    CompressStrat(int strat) {
+      this.strat = strat;
+    }
+  }
+  
+  /**
+   * Define all the slot kinds.
+   */
+  public enum SlotKind {
+    Slot_ArrayLength(! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, IN_MAIN_HEAP),
+    Slot_HeapRef(    IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_Int(        IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_Byte(       ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, IN_MAIN_HEAP),
+    Slot_Short(      IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_TypeCode(   ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, IN_MAIN_HEAP),
+
+    Slot_StrOffset(  ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, !IN_MAIN_HEAP),
+    Slot_StrLength(  ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, !IN_MAIN_HEAP),
+    Slot_Long_High(    IS_DIFF_ENCODE,           IGNORED, 0, !IN_MAIN_HEAP),
+    Slot_Long_Low (    IS_DIFF_ENCODE,           IGNORED, 0, !IN_MAIN_HEAP),
+
+    // the next are not actual slot kinds, but instead
+    // are codes used to control encoding of Floats and Doubles.
+    Slot_Float_Mantissa_Sign( ! IS_DIFF_ENCODE, CAN_BE_NEGATIVE, 0, !IN_MAIN_HEAP),
+    // exponent is 8 bits, and shifted in the expectation
+    // that many values may be between 1 and 0 (e.g., normalized values)
+    //   -- so sign moving is needed
+    Slot_Float_Exponent(      ! IS_DIFF_ENCODE, CAN_BE_NEGATIVE, 0, !IN_MAIN_HEAP),
+    
+    Slot_Double_Mantissa_Sign(! IS_DIFF_ENCODE, CAN_BE_NEGATIVE, 0, !IN_MAIN_HEAP),
+    Slot_Double_Exponent(     ! IS_DIFF_ENCODE, CAN_BE_NEGATIVE, 0, !IN_MAIN_HEAP),
+    Slot_FsIndexes(             IS_DIFF_ENCODE,         IGNORED, 4, !IN_MAIN_HEAP),
+    
+    Slot_StrChars(            IGNORED,          IGNORED, 2, !IN_MAIN_HEAP),
+    
+    Slot_Control(             IGNORED,          IGNORED, 0, !IN_MAIN_HEAP),
+    Slot_StrSeg(              ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 0, ! IN_MAIN_HEAP),
+    
+    // the next slots are not serialized
+    Slot_StrRef(     IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_BooleanRef( ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, IN_MAIN_HEAP),
+    Slot_ByteRef(    IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_ShortRef(   IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_LongRef(    IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_DoubleRef(  IS_DIFF_ENCODE,             IGNORED, 4, IN_MAIN_HEAP),
+    Slot_Float(      ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, IN_MAIN_HEAP),
+    Slot_Boolean(    ! IS_DIFF_ENCODE, ! CAN_BE_NEGATIVE, 4, IN_MAIN_HEAP),
+    // next used to capture original heap size
+    Slot_MainHeap(   IGNORED,          IGNORED,           4, !IN_MAIN_HEAP),
+
+    ;
+
+    public final int i;
+    public final boolean isDiffEncode;
+    public final boolean canBeNegative;
+    public final boolean inMainHeap;
+    public final int elementSize;
+    
+    public static final int NBR_SLOT_KIND_ZIP_STREAMS;
+    static {NBR_SLOT_KIND_ZIP_STREAMS = Slot_StrRef.i;}
+    
+    SlotKind(boolean isDiffEncode, 
+             boolean canBeNegative, 
+             int elementSize,
+             boolean inMainHeap) {
+      this.i = this.ordinal();
+      this.isDiffEncode = isDiffEncode;
+      this.canBeNegative = isDiffEncode ? true : canBeNegative;
+      this.elementSize = elementSize; 
+      this.inMainHeap = inMainHeap;
+    }
+  }
+  
+  /**
+   * Things set up for one instance of this class, and
+   * reuse-able
+   */
+  final private TypeInfo [] typeInfoArray;  // lazy initialization of elements
+  final private TypeSystemImpl ts;  // for debugging
+  final private boolean doMeasurements;
+  
+  // speedups
+  final private static int arrayLength_i = Slot_ArrayLength.i;
+  final private static int heapRef_i = Slot_HeapRef.i;
+  final private static int int_i = Slot_Int.i;
+  final private static int byte_i = Slot_Byte.ordinal();
+  final private static int short_i = Slot_Short.i;
+  final private static int typeCode_i = Slot_TypeCode.i;
+  final private static int strOffset_i = Slot_StrOffset.i;
+  final private static int strLength_i = Slot_StrLength.i;
+  final private static int long_High_i = Slot_Long_High.i;
+  final private static int long_Low_i = Slot_Long_Low.i;
+  final private static int float_Mantissa_Sign_i = Slot_Float_Mantissa_Sign.i;
+  final private static int float_Exponent_i = Slot_Float_Exponent.i;
+  final private static int double_Mantissa_Sign_i = Slot_Double_Mantissa_Sign.i;
+  final private static int double_Exponent_i = Slot_Double_Exponent.i;
+  final private static int fsIndexes_i = Slot_FsIndexes.i;
+  final private static int strChars_i = Slot_StrChars.i;
+  final private static int control_i = Slot_Control.i;
+  final private static int strSeg_i = Slot_StrSeg.i;
+  
+  /**
+   * 
+   * @param ts
+   * @param doMeasurements - normally set this to false. 
+   */
+  public BinaryCasSerDes4(TypeSystemImpl ts, boolean doMeasurements) {
+    this.ts = ts;
+    this.doMeasurements = doMeasurements;
+
+    typeInfoArray = new TypeInfo[(ts.getTypeArraySize())];
+  }
+
+  /**
+   * 
+   * @param cas
+   * @param out
+   * @param trackingMark
+   * @return null or serialization measurements (depending on setting of doMeasurements)
+   * @throws IOException
+   */
+  public SerializationMeasures serialize(AbstractCas cas, Object out, Marker trackingMark,
+      CompressLevel compressLevel, CompressStrat compressStrategy) throws IOException {
+    SerializationMeasures sm = (doMeasurements) ? new SerializationMeasures() : null;
+    CASImpl casImpl = (CASImpl) ((cas instanceof JCas) ? ((JCas)cas).getCas(): cas);
+    if (null != trackingMark && !trackingMark.isValid() ) {
+      throw new CASRuntimeException(
+                CASRuntimeException.INVALID_MARKER, new String[] { "Invalid Marker." });
+    }
+    
+    Serializer serializer = new Serializer(
+        casImpl, makeDataOutputStream(out), (MarkerImpl) trackingMark, sm,
+        compressLevel, compressStrategy);
+   
+    serializer.serialize();
+    return sm;
+  }
+  
+  public SerializationMeasures serialize(AbstractCas cas, Object out, Marker trackingMark,
+      CompressLevel compressLevel) throws IOException {
+    return serialize(cas, out,trackingMark, compressLevel, CompressStrat.Default);
+  }
+  
+  public SerializationMeasures serialize(AbstractCas cas, Object out, Marker trackingMark) throws IOException {
+    return serialize(cas, out,trackingMark, CompressLevel.Default, CompressStrat.Default);
+  }
+
+  public SerializationMeasures serialize(AbstractCas cas, Object out) throws IOException {
+    return serialize(cas, out, null);
+  }
+
+  public void deserialize(CASImpl cas, InputStream deserIn, boolean isDelta) throws IOException {
+    DataInput in;
+    if (deserIn instanceof DataInputStream) {
+      in = (DataInputStream)deserIn;
+    } else {
+      in = new DataInputStream(deserIn);
+    }
+    Deserializer deserializer = new Deserializer(cas, in, isDelta);    
+    deserializer.deserialize();
+  }
+
+  /**
+   * Class instantiated once per serialization
+   * Multiple serializations in parallel supported, with
+   * multiple instances of this
+   */
+  
+  private class Serializer {
+    final private DataOutputStream serializedOut;  // where to write out the serialized result
+    final private CASImpl cas;  // cas being serialized
+    final private MarkerImpl mark;  // the mark to serialize from
+    
+    final private SerializationMeasures sm;  // null or serialization measurements
+    final private ByteArrayOutputStream[] baosZipSources = new ByteArrayOutputStream[NBR_SLOT_KIND_ZIP_STREAMS];  // lazily created, indexed by SlotKind.i
+    final private DataOutputStream[] dosZipSources = new DataOutputStream[NBR_SLOT_KIND_ZIP_STREAMS];      // lazily created, indexed by SlotKind.i
+
+    final private int[] heap;           // main heap
+    private int heapStart;
+    final private int heapEnd;
+    final private StringHeap stringHeapObj;
+    final private LongHeap longHeapObj;
+    final private ShortHeap shortHeapObj;
+    final private ByteHeap byteHeapObj;
+
+    final private boolean isDelta;        // if true, there is a marker indicating the start spot(s)
+    final private boolean doMeasurement;  // if true, doing measurements
+    final private ComprItemRefs fsStartIndexes = (CHANGE_FS_REFS_TO_SEQUENTIAL) ? new ComprItemRefs() : null;
+    final private int[] typeCodeHisto = new int[ts.getTypeArraySize()]; 
+    final private Integer[] serializedTypeCode2Code = new Integer[ts.getTypeArraySize()]; // needs to be Integer to get comparator choice
+    final private int[] estimatedZipSize = new int[NBR_SLOT_KIND_ZIP_STREAMS]; // one entry for each output stream kind
+    final private OptimizeStrings os;
+    final private CompressLevel compressLevel;
+    final private CompressStrat compressStrategy;
+    
+    // typeInfo is local to this serialization instance to permit multiple threads
+    private TypeInfo typeInfo; // type info for the current type being serialized
+    private int iPrevHeap;        // 0 or heap addr of previous instance of current type
+    private boolean only1CommonString;  // true if only one common string
+    
+    // speedups
+    
+    // any use of these means caller handles measurement
+    // some of these are never used, because the current impl
+    //   is using the _i form to get measurements done
+    final private DataOutputStream arrayLength_dos;
+    final private DataOutputStream heapRef_dos;
+    final private DataOutputStream int_dos;
+    final private DataOutputStream byte_dos;
+    final private DataOutputStream short_dos;
+    final private DataOutputStream typeCode_dos;
+    final private DataOutputStream strOffset_dos;
+    final private DataOutputStream strLength_dos;
+    final private DataOutputStream long_High_dos;
+    final private DataOutputStream long_Low_dos;
+    final private DataOutputStream float_Mantissa_Sign_dos;
+    final private DataOutputStream float_Exponent_dos;
+    final private DataOutputStream double_Mantissa_Sign_dos;
+    final private DataOutputStream double_Exponent_dos;
+    final private DataOutputStream fsIndexes_dos;
+    final private DataOutputStream strChars_dos;
+    final private DataOutputStream control_dos;
+    final private DataOutputStream strSeg_dos;
+
+    private Serializer(CASImpl cas, DataOutputStream serializedOut, MarkerImpl mark,
+                       SerializationMeasures sm,
+                       CompressLevel compressLevel,
+                       CompressStrat compressStrategy) {
+      this.cas = cas;
+      this.serializedOut = serializedOut;
+      this.mark = mark;
+      this.sm = sm;
+      this.compressLevel = compressLevel;
+      this.compressStrategy = compressStrategy;
+      isDelta = (mark != null);
+      doMeasurement = (sm != null);
+      
+      heap = cas.getHeap().heap;
+      heapEnd = cas.getHeap().getCellsUsed();
+      heapStart = isDelta ? mark.getNextFSId() : 0;
+      
+      stringHeapObj = cas.getStringHeap();
+      longHeapObj   = cas.getLongHeap();
+      shortHeapObj  = cas.getShortHeap();
+      byteHeapObj   = cas.getByteHeap();
+     
+      os = new OptimizeStrings(doMeasurement);
+      
+      setupOutputStreams();   
+      arrayLength_dos = dosZipSources[arrayLength_i];
+      heapRef_dos = dosZipSources[heapRef_i];
+      int_dos = dosZipSources[int_i];
+      byte_dos = dosZipSources[byte_i];
+      short_dos = dosZipSources[short_i];
+      typeCode_dos = dosZipSources[typeCode_i];
+      strOffset_dos = dosZipSources[strOffset_i];
+      strLength_dos = dosZipSources[strLength_i];
+      long_High_dos = dosZipSources[long_High_i];
+      long_Low_dos = dosZipSources[long_Low_i];
+      float_Mantissa_Sign_dos = dosZipSources[float_Mantissa_Sign_i];
+      float_Exponent_dos = dosZipSources[float_Exponent_i];
+      double_Mantissa_Sign_dos = dosZipSources[double_Mantissa_Sign_i];
+      double_Exponent_dos = dosZipSources[double_Exponent_i];
+      fsIndexes_dos = dosZipSources[fsIndexes_i];
+      strChars_dos = dosZipSources[strChars_i];
+      control_dos = dosZipSources[control_i];
+      strSeg_dos = dosZipSources[strSeg_i];
+    }
+    
+    private void setupOutputStreams() {
+      // estimate model:
+      //   33% of space in strings, 33% in heap, 33% other
+      //   compr ratio for heap is 98%
+      int compr = (heapEnd - heapStart) * 8 / 3 / 50;
+      int compr1000 = Math.max(512, compr/1000);
+      // 2nd arg is the number of bytes in the byte output stream, initially
+      estimatedZipSize[typeCode_i] = Math.max(512, compr/4);  // /4 for ~4 slots per fs 
+//      estimatedZipSize[boolean_i] =compr1000;
+      estimatedZipSize[byte_i] = compr1000;
+      estimatedZipSize[short_i] = compr1000;
+      estimatedZipSize[int_i] = Math.max(1024, compr1000);
+      estimatedZipSize[arrayLength_i] = compr1000;
+      estimatedZipSize[float_Mantissa_Sign_i] = compr1000;
+      estimatedZipSize[float_Exponent_i] = compr1000;
+      estimatedZipSize[double_Mantissa_Sign_i] = compr1000;
+      estimatedZipSize[double_Exponent_i] = compr1000;
+      estimatedZipSize[long_High_i] = compr1000;
+      estimatedZipSize[long_Low_i] = compr1000;
+      estimatedZipSize[heapRef_i] = Math.max(1024, compr1000);
+      estimatedZipSize[strOffset_i] = Math.max(512, compr/4);
+      estimatedZipSize[strLength_i] = Math.max(512, compr/4);
+      estimatedZipSize[fsIndexes_i] = Math.max(512, compr/8);  // /4 for ~4 slots/fs, / 2 for # indexed
+      estimatedZipSize[strChars_i] = Math.max(512, compr/4); // strings compress better
+      estimatedZipSize[control_i] = 128;
+      
+      for (int i = 0; i < baosZipSources.length; i++) {
+        setupOutputStream(i);
+      }
+
+    }
+    
+    private void serialize() throws IOException {   
+
+      if (doMeasurement) {
+        System.out.println(printCasInfo(cas));
+        sm.origAuxBytes = cas.getByteHeap().getSize();
+        sm.origAuxShorts = cas.getShortHeap().getSize() * 2;
+        sm.origAuxLongs = cas.getLongHeap().getSize() * 8;
+        sm.totalTime = System.currentTimeMillis();
+      }
+
+      /******************
+       * Process Header  
+       * Standardized    
+       ******************/
+      // encode: bits 7 6 5 4 3 2 1 0
+      //                        0 0 1 = no delta, no compression
+      //                        0 1 - = delta, no compression
+      //                        1 d - = compression, w/wo delta
+      int version = 4 | ((isDelta) ? 2 : 0);
+      CASSerializer.outputVersion(version, serializedOut);
+        
+      serializedOut.writeInt(0);  // reserved for future version info
+      if (doMeasurement) {
+        sm.header = 12;
+      }
+           
+      /**************************
+       * Strings
+       **************************/
+      int stringHeapStart = isDelta ? mark.nextStringHeapAddr : 1;
+      int stringHeapEnd = stringHeapObj.getSize();
+ 
+      for (int i = stringHeapStart; i < stringHeapEnd; i++) {
+        os.add(stringHeapObj.getStringForCode(i));
+      }
+      
+      // also add in all modified strings
+      // with current design, all modified strings are guaranteed
+      // to be above the mark, so this code is commented out
+//    addModStr:
+//      if (isDelta) {
+//        serializerForModifiedFSs.sortModifications();
+//        final int[] mods = serializerForModifiedFSs.modifiedMainHeapAddrs;
+//        final int modLen = serializerForModifiedFSs.modMainHeapAddrsLength;
+//        int nextMod = 0;
+//        if (modLen > 0) {
+// 
+//          for (int iHeap = 1; iHeap < heapStart; iHeap += incrToNextFs(heap, iHeap, typeInfo)) {
+//            typeInfo = getTypeInfo(heap[iHeap]);
+//            FsStringRefs fsStringRefs = new FsStringRefs(typeInfo, heap, iHeap);
+//            for (int nextRef = fsStringRefs.next(); nextRef >= 0; nextRef = fsStringRefs.next()) {
+//              while (nextRef > mods[nextMod]) {
+//                nextMod ++;
+//                if (nextMod == modLen) {
+//                  break addModStr;
+//                }
+//              }
+//              if (nextRef == mods[nextMod]) {
+//                os.add(stringHeapObj.getStringForCode(heap[nextRef]));
+//                nextMod ++;
+//                if (nextMod == modLen) {
+//                  break addModStr;
+//                }
+//              } 
+//            }
+//          }   
+//        }
+//      }
+
+      os.optimize();
+      String [] commonStrings = os.getCommonStrings();
+      writeVnumber(strChars_i, commonStrings.length);
+      for (int i = 0; i < commonStrings.length; i++) {
+        int startPos = dosZipSources[strChars_i].size();
+        DataIO.writeUTFv(commonStrings[i], dosZipSources[strChars_i]);
+        // approximate histogram
+        if (doMeasurement) {
+          // len is utf-8 encoding
+          float len = dosZipSources[strChars_i].size() - startPos;
+          // if len == chars, then all got coded as 1 byte
+          // if len > chars, some were utf-8 coded as 2 bytes
+          float excess = (len / commonStrings[i].length()) - 1;  // excess over length 1
+          int encAs2 = (int)(excess * commonStrings[i].length());
+          
+          // simulate histo for all the chars, as 1 or 2 byte UTF8 encoding 
+          sm.statDetails[strChars_i].countTotal += commonStrings[i].length(); // total chars accum
+          sm.statDetails[strChars_i].c[0] = commonStrings[i].length() - encAs2;
+          sm.statDetails[strChars_i].c[1] = encAs2;
+          sm.statDetails[strChars_i].lengthTotal += len;  // total as UTF-8 encode
+        }
+      }
+      
+      only1CommonString = commonStrings.length == 1;
+
+      if (doMeasurement) {
+        long commonStringsLength = 0;
+        sm.stringsNbrCommon = commonStrings.length;
+        int r = 0;
+        for (int i = 0; i < commonStrings.length; i++) {
+          r += DataIO.lengthUTFv(commonStrings[i]);
+          commonStringsLength += commonStrings[i].length();
+        }
+        sm.stringsCommonChars = r;
+
+        sm.stringsSavedExact = os.getSavedCharsExact() * 2;
+        sm.stringsSavedSubstr = os.getSavedCharsSubstr() * 2;
+        sm.statDetails[strChars_i].original = os.getSavedCharsExact() * 2
+                                 + os.getSavedCharsSubstr() * 2
+                                 + commonStringsLength * 2;
+        sm.statDetails[strLength_i].original = (stringHeapEnd - stringHeapStart) * 4;
+        sm.statDetails[strOffset_i].original = (stringHeapEnd - stringHeapStart) * 4;
+      }
+      
+      /***************************
+       * Prepare to walk main heap
+       ***************************/
+      writeVnumber(control_dos, heapEnd - heapStart);  
+      if (doMeasurement) {
+        sm.statDetails[Slot_MainHeap.i].original = (1 + heapEnd - heapStart) * 4;      
+      }
+      
+      resetIprevious();
+
+      if (heapStart == 0) {
+        heapStart = 1;  // slot 0 not serialized, it's null / 0
+      }
+
+      if (CHANGE_FS_REFS_TO_SEQUENTIAL) {
+        // scan thru all fs and save their offsets in the heap
+        // to allow conversion from addr to sequential fs numbers
+        initFsStartIndexes(fsStartIndexes, heap, heapStart, heapEnd, typeCodeHisto);
+        
+        for (int i = ts.getTypeArraySize() - 1; i >= 0; i--) {
+          serializedTypeCode2Code[i] = i;
+        }
+        
+        // set typeCode2serializeCode so that the 0th element is the typeCode with the highest frequency, etc.
+        Arrays.sort(serializedTypeCode2Code, 0, serializedTypeCode2Code.length, new Comparator<Integer>() {
+          public int compare(Integer o1, Integer o2) {
+            return (typeCodeHisto[o1] > typeCodeHisto[o2]) ? -1 :
+                   (typeCodeHisto[o1] < typeCodeHisto[o2]) ? 1 : 0;
+          }
+        });
+        
+//        for (int i = 0; i < serializedTypeCode2Code.length; i++) {
+//          int tCode = serializedTypeCode2Code[i];
+//          int c = typeCodeHisto[tCode];
+//          if (c > 0) {
+//            System.out.format("%2d %,9d instance of Type %s%n", i, c, typeInfoArray[tCode]);
+//          }
+//        }
+        
+      }
+
+      
+      
+      /***************************
+       * walk main heap
+       ***************************/
+
+      for (int iHeap = heapStart; iHeap < heapEnd; iHeap += incrToNextFs(heap, iHeap, typeInfo)) {
+        int tCode = heap[iHeap];  // get type code      
+        typeInfo = getTypeInfo(tCode);
+        iPrevHeap = typeInfo.iPrevHeap;
+        
+        writeVnumber(typeCode_dos, tCode);
+
+        if (typeInfo.isHeapStoredArray) {
+          serializeHeapStoredArray(iHeap);
+        } else if (typeInfo.isArray) {
+          serializeNonHeapStoredArray(iHeap);
+        } else {
+          for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+            serializeByKind(iHeap, i);
+          }
+        }
+      
+        typeInfo.iPrevHeap = iHeap;  // make this one the "prev" one for subsequent testing
+        if (doMeasurement) {
+          sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(tCode));
+          sm.mainHeapFSs ++;
+        }
+      }  // end of heap walk
+      
+      serializeIndexedFeatureStructures();
+
+      if (isDelta) {
+        (new SerializeModifiedFSs()).serializeModifiedFSs();
+      }
+
+      collectAndZip();
+      
+      if (doMeasurement) {
+        sm.totalTime = System.currentTimeMillis() - sm.totalTime;
+      }
+    }
+    
+    
+    private void serializeIndexedFeatureStructures() throws IOException {
+      int[] fsIndexes = isDelta ? cas.getDeltaIndexedFSs(mark) : cas.getIndexedFSs();
+      if (doMeasurement) {
+        sm.statDetails[fsIndexes_i].original = fsIndexes.length * 4 + 1;      
+      }
+      int nbrViews = fsIndexes[0];
+      int nbrSofas = fsIndexes[1];
+      writeVnumber(control_i, nbrViews);
+      writeVnumber(control_i, nbrSofas);
+      
+      if (doMeasurement) {
+        sm.statDetails[fsIndexes_i].incr(1); // an approximation - probably correct
+        sm.statDetails[fsIndexes_i].incr(1);
+      }
+      
+      int fi = 2;
+      final int end1 = nbrSofas + 2;
+      for (; fi < end1; fi++) {
+        writeVnumber(control_i, fsIndexes[fi]);
+        
+        if (doMeasurement) {
+          sm.statDetails[fsIndexes_i].incr(DataIO.lengthVnumber(fsIndexes[fi]));
+        }
+      }
+       
+      for (int vi = 0; vi < nbrViews; vi++) {
+        fi = compressFsxPart(fsIndexes, fi);    // added FSs
+        if (isDelta) {
+          fi = compressFsxPart(fsIndexes, fi);  // removed FSs
+          fi = compressFsxPart(fsIndexes, fi);  // reindexed FSs
+        }
+      }      
+    }
+
+    private int compressFsxPart(int[] fsIndexes, int fsNdxStart) throws IOException {
+      int ix = fsNdxStart;
+      int nbrEntries = fsIndexes[ix++];
+      int end = ix + nbrEntries;
+      writeVnumber(fsIndexes_dos, nbrEntries);  // number of entries
+      if (doMeasurement) {
+        sm.statDetails[typeCode_i].incr(DataIO.lengthVnumber(nbrEntries));
+      }
+      
+      final int[] ia = new int[nbrEntries];
+      System.arraycopy(fsIndexes, ix, ia, 0, nbrEntries);
+      Arrays.sort(ia);
+     
+      int prev = 0;
+      
+      for (int i = 0; i < ia.length; i++) {
+        int v = ia[i];
+        if (CHANGE_FS_REFS_TO_SEQUENTIAL) {
+          v = fsStartIndexes.getItemIndex(v);
+        }
+        writeVnumber(fsIndexes_dos, v - prev);
+        if (doMeasurement) {
+          sm.statDetails[fsIndexes_i].incr(DataIO.lengthVnumber(v - prev));
+        }
+        prev = v;
+        
+      }
+      return end;
+    } 
+
+    private void serializeHeapStoredArray(int iHeap) throws IOException {
+      final int length = serializeArrayLength(iHeap);
+      // output values
+      // special case 0 and 1st value
+      if (length == 0) {
+        return;
+      }
+      SlotKind arrayElementKind = typeInfo.slotKinds[1];
+      final int endi = iHeap + length + 2;
+      switch (arrayElementKind) {
+      case Slot_HeapRef: case Slot_Int: case Slot_Short:
+        {
+          int prev = (iPrevHeap == 0) ? 0 :
+                     (heap[iPrevHeap + 1] == 0) ? 0 :
+                      heap[iPrevHeap + 2]; 
+          for (int i = iHeap + 2; i < endi; i++) {
+            prev = writeIntOrHeapRef(arrayElementKind.i, i, prev);
+          }
+        }
+        break;
+      case Slot_Float: 
+        for (int i = iHeap + 2; i < endi; i++) {
+          writeFloat(heap[i]);
+        }
+        break;
+      case Slot_StrRef:
+        for (int i = iHeap + 2; i < endi; i++) {
+          writeString(stringHeapObj.getStringForCode(heap[i]));
+        }
+        break;
+        
+      default: throw new RuntimeException("internal error");
+      } // end of switch    
+    }
+    
+    private int writeIntOrHeapRef(int kind, int index, int prev) throws IOException {
+      final int v = heap[index];
+      writeDiff(kind, v, prev);
+      return v;
+    }
+    
+    private long writeLongFromHeapIndex(int index, long prev) throws IOException {
+      final long v = longHeapObj.getHeapValue(heap[index]);      
+      writeLong(v, prev); 
+      return v;
+    }
+    
+    private void serializeNonHeapStoredArray(int iHeap) throws IOException {
+      final int length = serializeArrayLength(iHeap);
+      if (length == 0) {
+        return;
+      }
+      SlotKind refKind = typeInfo.getSlotKind(2);
+      switch (refKind) {
+      case Slot_BooleanRef: case Slot_ByteRef:
+        writeFromByteArray(refKind, heap[iHeap + 2], length);
+        if (doMeasurement) {
+          sm.statDetails[byte_i].incr(1);
+          sm.origAuxByteArrayRefs += 4;
+        }
+        break; 
+      case Slot_ShortRef:
+        writeFromShortArray(heap[iHeap + 2], length);
+        if (doMeasurement) {
+          sm.origAuxShortArrayRefs += 4;
+        }
+        break; 
+      case Slot_LongRef: case Slot_DoubleRef:
+        writeFromLongArray(refKind, heap[iHeap + 2], length);
+        if (doMeasurement) {
+          sm.origAuxLongArrayRefs += 4;
+        }
+        break; 
+      default:
+        throw new RuntimeException();
+      }
+    }
+    
+    private void serializeByKind(int iHeap, int offset) throws IOException {
+      SlotKind kind = typeInfo.getSlotKind(offset);      
+      switch (kind) {
+      //Slot_Int, Slot_Float, Slot_Boolean, Slot_Byte, Slot_Short
+      case Slot_Int: case Slot_Short: case Slot_HeapRef:
+        serializeDiffWithPrevTypeSlot(kind, iHeap, offset);
+        break;
+      case Slot_Float:
+        writeFloat(heap[iHeap + offset]);
+        break;
+      case Slot_Boolean: case Slot_Byte:
+        byte_dos.write(heap[iHeap + offset]);
+        break;
+      case Slot_StrRef: 
+        writeString(stringHeapObj.getStringForCode(heap[iHeap + offset]));
+        break;
+      case Slot_LongRef: 
+        writeLongFromHeapIndex(iHeap + offset, 
+                  (iPrevHeap == 0) ? 
+                    0L : 
+                    longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+        break;
+      case Slot_DoubleRef: 
+        writeDouble(longHeapObj.getHeapValue(heap[iHeap + offset]));
+        break;
+      default: 
+        throw new RuntimeException("internal error");
+      } // end of switch
+    }
+    
+    private int serializeArrayLength(int iHeap) throws IOException {
+      final int length = heap[iHeap + 1];
+      writeVnumber(arrayLength_i, length);
+      return length;
+    }
+    
+    private void serializeDiffWithPrevTypeSlot(SlotKind kind, int iHeap, int offset) throws IOException {
+      int prev = (iPrevHeap == 0) ? 0 : heap[iPrevHeap + offset];
+      writeDiff(kind.i, heap[iHeap + offset], prev);
+    }
+    
+    /**
+     * Method:
+     *   write with deflation into a single byte array stream
+     *     skip if not worth deflating
+     *     skip the Slot_Control stream
+     *     record in the Slot_Control stream, for each deflated stream:
+     *       the Slot index
+     *       the number of compressed bytes
+     *       the number of uncompressed bytes
+     *   add to header:  
+     *     nbr of compressed entries
+     *     the Slot_Control stream size
+     *     the Slot_Control stream
+     *     all the zipped streams
+     *
+     * @throws IOException 
+     */
+    private void collectAndZip() throws IOException {
+      ByteArrayOutputStream baosZipped = new ByteArrayOutputStream(4096);
+      Deflater deflater = new Deflater(compressLevel.lvl, true);
+      deflater.setStrategy(compressStrategy.strat);
+      int nbrEntries = 0;
+      
+      List<Integer> idxAndLen = new ArrayList<Integer>();
+
+      for (int i = 0; i < baosZipSources.length; i++) {
+        ByteArrayOutputStream baos = baosZipSources[i];
+        if (baos != null) {
+          nbrEntries ++;
+          dosZipSources[i].close();
+          long startTime = System.currentTimeMillis();
+          int zipBufSize = Math.max(1024, baos.size() / 100);
+          deflater.reset();
+          DeflaterOutputStream cds = new DeflaterOutputStream(baosZipped, deflater, zipBufSize);       
+          baos.writeTo(cds);
+          cds.close();
+          idxAndLen.add(i);
+          idxAndLen.add((int)(sm.statDetails[i].afterZip = deflater.getBytesWritten()));
+          idxAndLen.add((int)(sm.statDetails[i].beforeZip = deflater.getBytesRead()));
+          sm.statDetails[i].zipTime = System.currentTimeMillis() - startTime;
+        } 
+      }
+      serializedOut.writeInt(nbrEntries);                     // write number of entries
+      for (int i = 0; i < idxAndLen.size();) {
+        serializedOut.write(idxAndLen.get(i++));
+        serializedOut.writeInt(idxAndLen.get(i++));
+        serializedOut.writeInt(idxAndLen.get(i++));
+      }
+      baosZipped.writeTo(serializedOut);                      // write Compressed info
+    }
+    
+//    private DataOutputStream getZipStream(SlotKind kind) {
+//      DataOutputStream dos = dosZipSources[kind.i];
+//      if (null == dos) {
+//        dos = setupOutputStream(kind);
+//      }
+//      return dos;
+//    }
+    
+    public DataOutputStream setupOutputStream(int i) {
+      // set up output stream
+      int size = estimatedZipSize[i];
+      baosZipSources[i] = new ByteArrayOutputStream(size);
+      return dosZipSources[i] = new DataOutputStream(baosZipSources[i]); 
+    }
+
+    private void writeLong(long v, long prev) throws IOException {
+      writeDiff(long_High_i, (int)(v >>> 32), (int)(prev >>> 32));
+      writeDiff(long_Low_i,  (int)v, (int)prev);    
+    }
+
+    /**
+     * String encoding
+     *   Length = 0 - used for null, no offset written
+     *   Length = 1 - used for "", no offset written 
+     *   Length > 0 (subtract 1): used for actual string length
+     *   
+     *   Length < 0 - use (-length) as slot index  (minimum is 1, slot 0 is NULL)
+     *   
+     *   For length > 0, write also the offset.
+     *   
+     */
+    private void writeString(final String s) throws IOException {
+      if (null == s) {
+        writeVnumber(strLength_dos, 0);
+        if (doMeasurement) {
+          sm.statDetails[strLength_i].incr(1);
+        }
+        return;
+      } 
+      
+      int indexOrSeq = os.getIndexOrSeqIndex(s);
+      if (indexOrSeq < 0) {
+        final int v = encodeIntSign(indexOrSeq);
+        writeVnumber(strLength_dos, v);
+        if (doMeasurement) {
+          sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(v));
+        }
+        return;
+      }
+      
+      if (s.length() == 0) {
+        writeVnumber(strLength_dos, encodeIntSign(1));
+        if (doMeasurement) {
+          sm.statDetails[strLength_i].incr(1);
+        }
+        return;
+      }
+      
+      if (s.length() == Integer.MAX_VALUE) {
+        throw new RuntimeException("Cannot serialize string of Integer.MAX_VALUE length - too large.");
+      }
+      
+      final int offset = os.getOffset(indexOrSeq);
+      final int length = encodeIntSign(s.length() + 1);  // all lengths sign encoded because of above
+      writeVnumber(strOffset_dos, offset);
+      writeVnumber(strLength_dos, length);
+      if (doMeasurement) {
+        sm.statDetails[strOffset_i].incr(DataIO.lengthVnumber(offset));
+        sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(length));
+      }
+      if (!only1CommonString) {
+        final int csi = os.getCommonStringIndex(indexOrSeq);
+        writeVnumber(strSeg_dos, csi);
+        if (doMeasurement) {
+          sm.statDetails[strSeg_i].incr(DataIO.lengthVnumber(csi));
+        }
+      }
+    }
+
+    /**
+     * Need to support NAN sets, 
+     * 0x7fc.... for NAN
+     * 0xff8.... for NAN, negative infinity
+     * 0x7f8     for NAN, positive infinity
+     * 
+     * Because 0 occurs frequently, we reserve 
+     * exp of 0 for the value 0
+     *  
+     */
+    
+    private void writeFloat(int raw) throws IOException {
+      if (raw == 0) {
+        writeUnsignedByte(float_Exponent_dos, 0);
+        if (doMeasurement) {
+          sm.statDetails[float_Exponent_i].incr(1);
+        }
+        return;
+      }
+     
+      final int exponent = ((raw >>> 23) & 0xff) + 1;   // because we reserve 0, see above
+      final int revMants = Integer.reverse((raw & 0x007fffff) << 9);  
+      final int mants = (revMants << 1) + ((raw < 0) ? 1 : 0);
+      writeVnumber(float_Exponent_dos, exponent); 
+      writeVnumber(float_Mantissa_Sign_dos, mants);
+      if (doMeasurement) {
+        sm.statDetails[float_Exponent_i].incr(DataIO.lengthVnumber(exponent));
+        sm.statDetails[float_Mantissa_Sign_i].incr(DataIO.lengthVnumber(mants));
+      }
+    }
+
+    private void writeVnumber(int kind, int v) throws IOException {
+      DataIO.writeVnumber(dosZipSources[kind], v);
+      if (doMeasurement) {
+        sm.statDetails[kind].incr(DataIO.lengthVnumber(v));
+      }
+    }
+    
+    private void writeVnumber(int kind, long v) throws IOException {
+      DataIO.writeVnumber(dosZipSources[kind], v);
+      if (doMeasurement) {
+        sm.statDetails[kind].incr(DataIO.lengthVnumber(v));
+      }
+    }
+    
+    // this version doesn't do measurements, caller needs to do it
+    private void writeVnumber(DataOutputStream s, int v) throws IOException {
+      DataIO.writeVnumber(s, v);
+    }
+    
+    // this version doesn't do measurements, caller needs to do it
+    private void writeVnumber(DataOutputStream s, long v) throws IOException {
+      DataIO.writeVnumber(s, v);
+    }
+
+    // this version doesn't do measurements, caller needs to do it    
+    private void writeUnsignedByte(DataOutputStream s, int v) throws IOException {
+      s.write(v);
+    }
+
+    private void writeDouble(long raw) throws IOException {
+      if (raw == 0L) {
+        writeVnumber(double_Exponent_dos, 0);
+        if (doMeasurement) {
+          sm.statDetails[double_Exponent_i].incr(1);
+        }
+        return;
+      }
+      int exponent = (int)((raw >>> 52) & 0x7ff);
+      exponent = exponent - 1023; // rebase so 1.0 = 0
+      if (exponent >= 0) {
+        exponent ++; // skip "0", used above for 0 value
+      }
+      exponent = encodeIntSign(exponent);  
+      final long revMants = Long.reverse((raw & 0x000fffffffffffffL) << 12);  
+      final long mants = (revMants << 1) + ((raw < 0) ? 1 : 0);
+      writeVnumber(double_Exponent_dos, exponent);
+      writeVnumber(double_Mantissa_Sign_dos, mants);
+      if (doMeasurement) {
+        sm.statDetails[double_Exponent_i].incr(DataIO.lengthVnumber(exponent));
+        sm.statDetails[double_Mantissa_Sign_i].incr(DataIO.lengthVnumber(mants));
+      }
+    }
+    
+    private int encodeIntSign(int v) {
+      if (v < 0) {
+        return ((-v) << 1) | 1;
+      }
+      return (v << 1);
+    }
+
+    /**
+     * Encoding:
+     *    bit 6 = sign:   1 = negative
+     *    bit 7 = delta:  1 = delta
+     * @param kind
+     * @param i  runs from iHeap + 3 to end of array
+     * @throws IOException 
+     */
+    private void writeDiff(int kind, int v, int prev) throws IOException {
+      if (v == 0) {
+        writeVnumber(kind, 0);  // a speedup, not a new encoding
+        if (doMeasurement) {
+          sm.statDetails[kind].diffEncoded ++;
+          sm.statDetails[kind].valueLeDiff ++;
+        }
+        return;
+      }
+      
+      if (v == Integer.MIN_VALUE) { // special handling, because abs fails
+        writeVnumber(kind, 2);      // written as -0
+        if (doMeasurement) {
+          sm.statDetails[kind].diffEncoded ++;
+          sm.statDetails[kind].valueLeDiff ++;
+        }
+        return;
+      }
+    
+      if (CHANGE_FS_REFS_TO_SEQUENTIAL && (kind == heapRef_i)) {
+        v = fsStartIndexes.getItemIndex(v);
+        if (prev != 0) {
+          prev = fsStartIndexes.getItemIndex(prev);
+        }
+      }
+
+      final int absV = Math.abs(v);
+      if (((v > 0) && (prev > 0)) ||
+          ((v < 0) && (prev < 0))) {
+        final int diff = v - prev;  // guaranteed not to overflow
+        final int absDiff = Math.abs(diff);
+        writeVnumber(kind, 
+            (absV <= absDiff) ? 
+                ((long)absV << 2)    + ((v < 0) ? 2L : 0L) :
+                ((long)absDiff << 2) + ((diff < 0) ? 3L : 1L));
+        if (doMeasurement) {
+          sm.statDetails[kind].diffEncoded ++;
+          sm.statDetails[kind].valueLeDiff += (absV <= absDiff) ? 1 : 0;
+        }
+        return;
+      }
+      // if get here, then the abs v value is always <= the abs diff value.
+      writeVnumber(kind, ((long)absV << 2) + ((v < 0) ? 2 : 0));
+      if (doMeasurement) {
+        sm.statDetails[kind].diffEncoded ++;
+        sm.statDetails[kind].valueLeDiff ++;
+      }
+    }
+
+    private void writeFromByteArray(SlotKind kind, int startPos, int length) throws IOException {
+      byte_dos.write(byteHeapObj.heap, startPos, length);
+    }
+
+    private void writeFromLongArray(SlotKind kind, int startPos, int length) throws IOException {
+      final long[] h = longHeapObj.heap;
+      final int endPos = startPos + length;
+      long prev = 0;
+      for (int i = startPos; i < endPos; i++) {
+        final long e = h[i];
+        if (kind == Slot_DoubleRef) {
+          writeDouble(e);
+        } else {
+          writeLong(e, prev);
+          prev = e;
+        }
+      }
+    }
+    
+    private void writeFromShortArray(int startPos, int length) throws IOException {
+      final short[] h = shortHeapObj.heap;
+      final int endPos = startPos + length;
+      int prev = 0;
+      for (int i = startPos; i < endPos; i++) {
+        final short e = h[i];
+        writeDiff(short_i, e, prev);
+        prev = e;
+      }
+    }
+
+    /******************************************************************************
+     * Modified Values
+     * Output:
+     *   For each FS that has 1 or more modified values,
+     *     write the heap addr of the FS
+     *     
+     *     For all modified values within the FS:
+     *       if it is an aux array element, write the index in the aux array and the new value
+     *       otherwise, write the slot offset and the new value
+     ******************************************************************************/
+    public class SerializeModifiedFSs {
+
+      final int[] modifiedMainHeapAddrs = cas.getModifiedFSHeapAddrs().toArray();
+      final int[] modifiedFSs = cas.getModifiedFSList().toArray();
+      final int[] modifiedByteHeapAddrs = cas.getModifiedByteHeapAddrs().toArray();
+      final int[] modifiedShortHeapAddrs = cas.getModifiedShortHeapAddrs().toArray();
+      final int[] modifiedLongHeapAddrs = cas.getModifiedLongHeapAddrs().toArray();
+
+      {sortModifications();}
+      
+      final int modMainHeapAddrsLength = eliminateDuplicatesInMods(modifiedMainHeapAddrs);
+      final int modFSsLength = eliminateDuplicatesInMods(modifiedFSs);
+      final int modByteHeapAddrsLength = eliminateDuplicatesInMods(modifiedByteHeapAddrs);
+      final int modShortHeapAddrsLength = eliminateDuplicatesInMods(modifiedShortHeapAddrs);
+      final int modLongHeapAddrsLength = eliminateDuplicatesInMods(modifiedLongHeapAddrs);
+
+      // ima           - index into modified arrays
+      // ixx, iPrevxxx - index in heap being changed
+      //                 value comes via the main heap or aux heaps
+      
+      int imaModMainHeap = 0;
+      int imaModByteRef = 0;
+      int imaModShortRef = 0;
+      int imaModLongRef = 0;
+ 
+      // previous value - for things diff encoded
+      int vPrevModInt = 0;
+      int vPrevModHeapRef = 0;
+      short vPrevModShort = 0;
+      long vPrevModLong = 0;
+      
+      int iHeap;
+      TypeInfo typeInfo;
+      
+      private void serializeModifiedFSs() throws IOException {
+        iPrevHeap = 0;   
+        // write out number of modified Feature Structures
+        writeVnumber(control_dos, modFSsLength);
+        // iterate over all modified feature structures
+        /**
+         * Theorems about these data
+         *   1) Assumption: if an AuxHeap array is modified, its heap FS is in the list of modFSs
+         *   2) FSs with AuxHeap values have increasing ref values into the Aux heap as FS addr increases
+         *      (because the ref is not updateable).
+         *   3) Assumption: String array element modifications are main heap slot changes
+         *      and recorded as such
+         */
+        
+        for (int i = 0; i < modFSsLength; i++) {
+          iHeap = modifiedFSs[i];     
+          final int tCode = heap[iHeap];
+          typeInfo = getTypeInfo(tCode);
+          
+          // write out the address of the modified FS
+          writeVnumber(fsIndexes_dos, iHeap - iPrevHeap);
+          // delay updating iPrevHeap until end of "for" loop
+          
+          /**************************************************
+           * handle aux byte, short, long array modifications
+           **************************************************/
+          if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) {
+            writeAuxHeapMods();           
+          } else { 
+            writeMainHeapMods(); 
+          }  // end of processing 1 modified FS
+          iPrevHeap = iHeap;
+        }  // end of for loop over all modified FSs
+      }  // end of method
+      
+      // sort and remove duplicates
+      private void sortModifications() {
+        Arrays.sort(modifiedMainHeapAddrs);
+        Arrays.sort(modifiedFSs);
+        Arrays.sort(modifiedByteHeapAddrs);
+        Arrays.sort(modifiedShortHeapAddrs);
+        Arrays.sort(modifiedLongHeapAddrs);
+      }
+      
+      private int eliminateDuplicatesInMods(final int[] sorted) {
+        int length = sorted.length;
+        if (length < 2) {
+          return length;
+        }
+        
+        int prev = sorted[0];
+        int to = 1;
+        for(int from = 1; from < length; from++) {
+          int s = sorted[from];
+          if (s == prev) {
+            continue;
+          }
+          prev = s;
+          sorted[to] = s;
+          to++;
+        }    
+        return to;  // to is length
+      }
+
+      private int countModifiedSlotsInFs(int fsLength) {
+        return countModifiedSlots(iHeap, fsLength, modifiedMainHeapAddrs, imaModMainHeap, modMainHeapAddrsLength);
+      }
+      
+      private int countModifiedSlotsInAuxHeap(int[] modifiedAddrs, int indexInModAddrs, int length) {
+        return countModifiedSlots(heap[iHeap + 2], heap[iHeap + 1], modifiedAddrs, indexInModAddrs, length);
+      }
+      
+      private int countModifiedSlots(int firstAddr, int length, int[] modifiedAddrs, int indexInModAddrs, int modAddrsLength) {
+        if (0 == length) {
+          throw new RuntimeException();  // can't happen
+        }
+        final int nextAddr = firstAddr + length;
+        int nextModAddr = modifiedAddrs[indexInModAddrs]; 
+        if ((firstAddr > nextModAddr) ||
+            (nextModAddr >= nextAddr)) {
+          throw new RuntimeException(); // never happen - must have one slot at least modified in this fs          
+        }
+        int i = 1;
+        for (;; i++) {
+          if ((indexInModAddrs + i) == modAddrsLength) {
+            break;
+          }
+          nextModAddr = modifiedAddrs[indexInModAddrs + i];
+          if (nextModAddr >= nextAddr) {
+            break;
+          }
+        }
+        return i;
+      }
+      
+      private void writeMainHeapMods() throws IOException {
+        final int fsLength = incrToNextFs(heap, iHeap, typeInfo);
+        final int numberOfModsInFs = countModifiedSlotsInFs(fsLength);
+        writeVnumber(fsIndexes_dos, numberOfModsInFs);
+        int iPrevOffsetInFs = 0;
+
+        for (int i = 0; i < numberOfModsInFs; i++) {
+          final int nextMainHeapIndex = modifiedMainHeapAddrs[imaModMainHeap++];
+          final int offsetInFs = nextMainHeapIndex - iHeap;
+          
+          writeVnumber(fsIndexes_dos, offsetInFs - iPrevOffsetInFs);
+          iPrevOffsetInFs = offsetInFs;
+          
+          final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : offsetInFs);
+          
+          switch (kind) {
+          case Slot_HeapRef:
+            vPrevModHeapRef = writeIntOrHeapRef(heapRef_i, nextMainHeapIndex, vPrevModHeapRef);
+            break;
+          case Slot_Int:
+            vPrevModInt = writeIntOrHeapRef(int_i, nextMainHeapIndex, vPrevModInt);
+            break;
+          case Slot_Short:
+            vPrevModShort = (short)writeIntOrHeapRef(int_i, nextMainHeapIndex, vPrevModShort);             
+            break;
+          case Slot_LongRef:
+            vPrevModLong = writeLongFromHeapIndex(nextMainHeapIndex, vPrevModLong); 
+            break;
+          case Slot_Byte: case Slot_Boolean:
+            byte_dos.write(heap[nextMainHeapIndex]);
+            break;
+          case Slot_Float:
+            writeFloat(heap[nextMainHeapIndex]);
+            break;
+          case Slot_StrRef:
+            writeString(stringHeapObj.getStringForCode(heap[nextMainHeapIndex]));
+            break;
+          case Slot_DoubleRef:
+            writeDouble(longHeapObj.getHeapValue(heap[nextMainHeapIndex]));
+            break;
+          default:
+            throw new RuntimeException();
+          }
+
+        }  // end of looping for all modified slots in this FS
+      }
+      
+      private void writeAuxHeapMods() throws IOException {
+        final int auxHeapIndex = heap[iHeap + 2];
+        int iPrevOffsetInAuxArray = 0;
+        
+        final SlotKind kind = typeInfo.getSlotKind(2);  // get kind of element
+        final boolean isAuxByte = ((kind == Slot_BooleanRef) || (kind == Slot_ByteRef));
+        final boolean isAuxShort = (kind == Slot_ShortRef);
+        final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
+        
+        if (!(isAuxByte | isAuxShort | isAuxLong)) {
+          throw new RuntimeException();  // never happen
+        }
+        
+        final int[] modXxxHeapAddrs = isAuxByte  ? modifiedByteHeapAddrs :
+                                      isAuxShort ? modifiedShortHeapAddrs :
+                                                   modifiedLongHeapAddrs;
+        final int modXxxHeapAddrsLength = isAuxByte  ? modByteHeapAddrsLength :
+                                          isAuxShort ? modShortHeapAddrsLength :
+                                                       modLongHeapAddrsLength;
+        int imaModXxxRef = isAuxByte  ? imaModByteRef :
+                                 isAuxShort ? imaModShortRef : 
+                                              imaModLongRef;
+        
+        final int numberOfModsInAuxHeap = countModifiedSlotsInAuxHeap(modXxxHeapAddrs, imaModXxxRef, modXxxHeapAddrsLength);
+        writeVnumber(fsIndexes_dos, numberOfModsInAuxHeap);
+        
+        for (int i = 0; i < numberOfModsInAuxHeap; i++) {
+          final int nextModAuxIndex = modXxxHeapAddrs[imaModXxxRef++];
+          final int offsetInAuxArray = nextModAuxIndex - auxHeapIndex;
+          
+          writeVnumber(fsIndexes_dos, offsetInAuxArray - iPrevOffsetInAuxArray);
+          iPrevOffsetInAuxArray = offsetInAuxArray;
+          
+          if (isAuxByte) {
+            writeUnsignedByte(byte_dos, byteHeapObj.getHeapValue(nextModAuxIndex));
+          } else if (isAuxShort) {
+            final short v = shortHeapObj.getHeapValue(nextModAuxIndex);
+            writeDiff(int_i, v, vPrevModShort);
+            vPrevModShort = v;
+          } else {
+            long v = longHeapObj.getHeapValue(nextModAuxIndex);
+            if (kind == Slot_LongRef) {
+              writeLong(v, vPrevModLong);
+              vPrevModLong = v;    
+            } else {
+              writeDouble(v);
+            }
+          }
+          
+          if (isAuxByte) {
+            imaModByteRef++;
+          } else if (isAuxShort) {
+            imaModShortRef++;
+          } else {
+            imaModLongRef++;
+          }
+          
+        }
+      }
+    } // end of class definition for SerializeModifiedFSs
+    
+  }  // end of class definition for Serializer
+  
+  /**
+   * Class instantiated once per deserialization
+   * Multiple deserializations in parallel supported, with
+   * multiple instances of this
+   */
+  private class Deserializer {
+    
+    final private CASImpl cas;  // cas being serialized
+    final private DataInput deserIn;
+
+    final private DataInputStream[] dataInputs = new DataInputStream[NBR_SLOT_KIND_ZIP_STREAMS];
+    private Inflater[] inflaters = new Inflater[NBR_SLOT_KIND_ZIP_STREAMS];
+
+    private int[] heap;           // main heap
+    private int heapStart;
+    private int heapEnd;
+    
+    private IntVector fixupsNeeded;  // for deserialization, the "fixups" for relative heap refs needed
+    private StringHeap stringHeapObj;
+    private LongHeap longHeapObj;
+    private ShortHeap shortHeapObj;
+    private ByteHeap byteHeapObj;
+    
+    private int stringTableOffset;
+    
+    /**
+     * Cache sharable common values in aux heaps
+     * Values must be in aux heap, but not part of arrays there
+     *   so that rules out boolean, byte, and shorts
+     */
+    private int longZeroIndex = -1; // also used for double 0 indix
+    private int double1Index = -1;
+
+    final private boolean isDelta;        // if true, a delta is being deserialized
+    final private ComprItemRefs fsStartIndexes = (CHANGE_FS_REFS_TO_SEQUENTIAL) ? new ComprItemRefs() : null;
+    private String[] readCommonString;
+
+    private TypeInfo typeInfo; // type info for the current type being serialized
+
+    private int iPrevHeap;        // 0 or heap addr of previous instance of current type
+    private boolean only1CommonString;
+
+    // speedups
+    
+    final private DataInputStream arrayLength_dis;
+    final private DataInputStream heapRef_dis;
+    final private DataInputStream int_dis;
+    final private DataInputStream byte_dis;
+    final private DataInputStream short_dis;
+    final private DataInputStream typeCode_dis;
+    final private DataInputStream strOffset_dis;
+    final private DataInputStream strLength_dis;
+    final private DataInputStream long_High_dis;
+    final private DataInputStream long_Low_dis;
+    final private DataInputStream float_Mantissa_Sign_dis;
+    final private DataInputStream float_Exponent_dis;
+    final private DataInputStream double_Mantissa_Sign_dis;
+    final private DataInputStream double_Exponent_dis;
+    final private DataInputStream fsIndexes_dis;
+    final private DataInputStream strChars_dis;
+    final private DataInputStream control_dis;
+    final private DataInputStream strSeg_dis;
+
+    /**
+     * Called after header was read and determined that
+     * this was a compressed binary 
+     * @param cas
+     * @param deserIn
+     * @throws IOException 
+     */
+    Deserializer(CASImpl cas, DataInput deserIn, boolean isDelta) throws IOException {
+      this.cas = cas;
+      this.deserIn = deserIn;
+      this.isDelta = isDelta;
+      
+      stringHeapObj = cas.getStringHeap();
+      longHeapObj   = cas.getLongHeap();
+      shortHeapObj  = cas.getShortHeap();
+      byteHeapObj   = cas.getByteHeap();
+
+      deserIn.readInt();    // reserved to record additional version info
+      final int nbrEntries = deserIn.readInt();  // number of compressed streams
+      
+      IntVector idxAndLen = new IntVector(nbrEntries * 2);
+      
+      for (int i = 0; i < nbrEntries; i++) {
+        idxAndLen.add(deserIn.readUnsignedByte());  // slot ordinal number
+        idxAndLen.add(deserIn.readInt());           // compressed size, bytes
+        idxAndLen.add(deserIn.readInt());           // decompressed size, bytes (not currently used)
+      }
+      
+      for (int i = 0; i < idxAndLen.size();) {
+        setupReadStream(idxAndLen.get(i++), idxAndLen.get(i++), idxAndLen.get(i++));
+      }
+
+      arrayLength_dis = dataInputs[arrayLength_i];
+      heapRef_dis = dataInputs[heapRef_i];
+      int_dis = dataInputs[int_i];
+      byte_dis = dataInputs[byte_i];
+      short_dis = dataInputs[short_i];
+      typeCode_dis = dataInputs[typeCode_i];
+      strOffset_dis = dataInputs[strOffset_i];
+      strLength_dis = dataInputs[strLength_i];
+      long_High_dis = dataInputs[long_High_i];
+      long_Low_dis = dataInputs[long_Low_i];
+      float_Mantissa_Sign_dis = dataInputs[float_Mantissa_Sign_i];
+      float_Exponent_dis = dataInputs[float_Exponent_i];
+      double_Mantissa_Sign_dis = dataInputs[double_Mantissa_Sign_i];
+      double_Exponent_dis = dataInputs[double_Exponent_i];
+      fsIndexes_dis = dataInputs[fsIndexes_i];
+      strChars_dis = dataInputs[strChars_i];
+     
+      control_dis = dataInputs[control_i];
+      strSeg_dis = dataInputs[strSeg_i];
+
+    }
+    
+    private void deserialize() throws IOException {
+      
+      /************************************************
+       * Setup all the input streams with inflaters
+       ************************************************/
+      long startTime1 = System.currentTimeMillis();
+      
+      /************************************************
+       * Read in the common string(s)
+       ************************************************/
+      int lenCmnStrs = readVnumber(strChars_dis);
+      readCommonString = new String[lenCmnStrs];
+      for (int i = 0; i < lenCmnStrs; i++) {
+        readCommonString[i] = DataIO.readUTFv(strChars_dis);
+      }
+      only1CommonString = lenCmnStrs == 1;
+      /***************************
+       * Prepare to walk main heap
+       ***************************/
+      int deltaHeapSize = readVnumber(control_dis);         
+      final Heap heapObj = cas.getHeap();
+      
+      heapStart = isDelta ? heapObj.getNextId() : 0;
+      stringTableOffset = isDelta ? (stringHeapObj.getSize() - 1) : 0;
+      
+      if (isDelta) {
+        heapObj.grow(deltaHeapSize);
+      } else {
+        heapObj.reinitSizeOnly(deltaHeapSize);
+      } 
+      
+      heapEnd = heapStart + deltaHeapSize; 
+      heap = heapObj.heap;
+
+      resetIprevious();
+      
+      if (heapStart == 0) {
+        heapStart = 1;  // slot 0 not serialized, it's null / 0
+      }
+
+      if (CHANGE_FS_REFS_TO_SEQUENTIAL && (heapStart > 1)) {
+        initFsStartIndexes(fsStartIndexes, heap, 1, heapStart, null);
+      }
+      fixupsNeeded = new IntVector(Math.max(16, heap.length / 10));
+
+      /***************************
+       * walk main heap
+       ***************************/
+
+      for (int iHeap = heapStart; iHeap < heapEnd; iHeap += incrToNextFs(heap, iHeap, typeInfo)) {
+        if (CHANGE_FS_REFS_TO_SEQUENTIAL) {
+          fsStartIndexes.addItemAddr(iHeap);
+        }        
+        int tCode = heap[iHeap] = readVnumber(typeCode_dis); // get type code      
+        typeInfo = getTypeInfo(tCode);
+        iPrevHeap = typeInfo.iPrevHeap;
+
+        if (typeInfo.isHeapStoredArray) {
+          readHeapStoredArray(iHeap);
+        } else if (typeInfo.isArray) {
+          readNonHeapStoredArray(iHeap);
+        } else {
+          for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+            readByKind(iHeap, i);
+          }
+        }
+        
+        typeInfo.iPrevHeap = iHeap;  // make this one the "prev" one for subsequent testing
+      }
+      
+      if (CHANGE_FS_REFS_TO_SEQUENTIAL) {
+        fsStartIndexes.finishSetup();
+        final int end = fixupsNeeded.size();
+        for (int i = 0; i < end; i++) {
+          final int heapAddrToFix = fixupsNeeded.get(i);
+          heap[heapAddrToFix] = fsStartIndexes.getItemAddr(heap[heapAddrToFix]);
+        }        
+      }
+      
+      readIndexedFeatureStructures();
+
+      if (isDelta) {
+        (new ReadModifiedFSs()).readModifiedFSs();
+      }
+
+      closeDataInputs();
+      System.out.format("Deserialize took %,d ms%n", System.currentTimeMillis() - startTime1);
+    }
+    
+    private void readNonHeapStoredArray(int iHeap) throws IOException {
+      final int length = readArrayLength(iHeap);
+      if (length == 0) {
+        return;
+      }
+      SlotKind refKind = typeInfo.getSlotKind(2);
+      switch (refKind) {
+      case Slot_BooleanRef: case Slot_ByteRef:
+        heap[iHeap + 2] = readIntoByteArray(length);
+        break; 
+      case Slot_ShortRef:
+        heap[iHeap + 2] = readIntoShortArray(length);
+        break; 
+      case Slot_LongRef: case Slot_DoubleRef:
+        heap[iHeap + 2] = readIntoLongArray(refKind, length);
+        break; 
+      default:
+        throw new RuntimeException();
+      }
+    }
+    
+    private int readArrayLength(int iHeap) throws IOException {
+      return heap[iHeap + 1] = readVnumber(arrayLength_dis);
+    }
+
+    private void readHeapStoredArray(int iHeap) throws IOException {
+      final int length = readArrayLength(iHeap);
+      // output values
+      // special case 0 and 1st value
+      if (length == 0) {
+        return;
+      }
+      SlotKind arrayElementKind = typeInfo.slotKinds[1];
+      final int endi = iHeap + length + 2;
+      switch (arrayElementKind) {
+      case Slot_HeapRef: case Slot_Int: case Slot_Short:
+        {
+          int prev = (iPrevHeap == 0) ? 0 :
+                     (heap[iPrevHeap + 1] == 0) ? 0 :
+                      heap[iPrevHeap + 2]; 
+          for (int i = iHeap + 2; i < endi; i++) {
+            final int v = heap[i] = readDiff(arrayElementKind, prev);
+            prev = v;
+            if (arrayElementKind == Slot_HeapRef) {
+              fixupsNeeded.add(i);
+            }
+          }
+        }
+        break;
+      case Slot_Float: 
+        for (int i = iHeap + 2; i < endi; i++) {
+          heap[i] = readFloat();
+        }
+        break;
+      case Slot_StrRef:
+        for (int i = iHeap + 2; i < endi; i++) {
+          heap[i] = readString();
+        }
+        break;
+        
+      default: throw new RuntimeException("internal error");
+      } // end of switch    
+    }
+          
+    private void readByKind(int iHeap, int offset) throws IOException {
+      SlotKind kind = typeInfo.getSlotKind(offset);
+      
+      switch (kind) {
+      case Slot_Int: case Slot_Short:
+        readDiffWithPrevTypeSlot(kind, iHeap, offset);
+        break;
+      case Slot_Float:
+        heap[iHeap + offset] = readFloat();
+        break;
+      case Slot_Boolean: case Slot_Byte:
+        heap[iHeap + offset] = byte_dis.readByte();
+        break;
+      case Slot_HeapRef:
+        readDiffWithPrevTypeSlot(kind, iHeap, offset);
+        if (kind == Slot_HeapRef) {
+          fixupsNeeded.add(iHeap + offset);
+        }
+        break;
+      case Slot_StrRef: 
+        heap[iHeap + offset] = readString();
+        break;
+      case Slot_LongRef: {
+        long v = readLong(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+        if (v == 0L) {
+          if (longZeroIndex == -1) {
+            longZeroIndex = longHeapObj.addLong(0L);
+          }
+          heap[iHeap + offset] = longZeroIndex;
+        } else {
+          heap[iHeap + offset] = longHeapObj.addLong(v);
+        }
+        break;
+      }
+      case Slot_DoubleRef: {
+        long v = readDouble();
+        if (v == 0L) {
+          if (longZeroIndex == -1) {
+            longZeroIndex = longHeapObj.addLong(0L);
+          }
+          heap[iHeap + offset] = longZeroIndex;
+        } else if (v == DBL_1) {
+          if (double1Index == -1) {
+            double1Index = longHeapObj.addLong(DBL_1);
+          }
+          heap[iHeap + offset] = double1Index;
+        } else {
+          heap[iHeap + offset] = longHeapObj.addLong(v);
+        }
+        break;
+      }
+      default: 
+        throw new RuntimeException("internal error");                
+      } // end of switch
+    }
+
+    private void readIndexedFeatureStructures() throws IOException {
+      final int nbrViews = readVnumber(control_dis);
+      final int nbrSofas = readVnumber(control_dis);
+
+      IntVector fsIndexes = new IntVector(nbrViews + nbrSofas + 100);
+      fsIndexes.add(nbrViews);
+      fsIndexes.add(nbrSofas);
+      for (int i = 0; i < nbrSofas; i++) {
+        fsIndexes.add(readVnumber(control_dis));
+      }
+        
+      for (int i = 0; i < nbrViews; i++) {
+        readFsxPart(fsIndexes);     // added FSs
+        if (isDelta) {
+          readFsxPart(fsIndexes);   // removed FSs
+          readFsxPart(fsIndexes);   // reindexed FSs
+        }
+      }
+      
+      if (isDelta) {
+        // getArray avoids copying.
+        // length is too long, but is never accessed
+        cas.reinitDeltaIndexedFSs(fsIndexes.getArray());
+      } else {
+        cas.reinitIndexedFSs(fsIndexes.getArray());
+      }
+    }
+
+    /**
+     * Each FS index is sorted, and output is by delta 
+     */
+    private void readFsxPart(IntVector fsIndexes) throws IOException {
+      final int nbrEntries = readVnumber(fsIndexes_dis);
+      fsIndexes.add(nbrEntries);      
+      int prev = 0;
+      
+      for (int i = 0; i < nbrEntries; i++) {
+        int v = readVnumber(fsIndexes_dis) + prev;
+        prev = v;
+        if (CHANGE_FS_REFS_TO_SEQUENTIAL) {
+          v = fsStartIndexes.getItemAddr(v);
+        }
+        fsIndexes.add(v);
+      }
+    } 
+
+    private void setupReadStream(
+        int slotIndex, 
+        int bytesCompr,
+        int bytesOrig) throws IOException {
+      byte[] b = new byte[bytesCompr + 1];
+      deserIn.readFully(b, 0, bytesCompr);  // this leaves 1 extra 0 byte at the end
+      // which may be required by Inflater with nowrap option - see Inflater javadoc
+      
+      // testing inflate speed
+//      long startTime = System.currentTimeMillis();
+//      inflater.reset();
+//      inflater.setInput(b);
+//      byte[] uncompressed = new byte[bytesOrig];
+//      int uncompressedLength = 0;
+//      try {
+//        uncompressedLength = inflater.inflate(uncompressed);
+//      } catch (DataFormatException e) {
+//        throw new RuntimeException(e);
+//      }
+//      if (uncompressedLength != bytesOrig) {
+//        throw new RuntimeException();
+//      }
+//      System.out.format("Decompress %s took %,d ms%n", 
+//          SlotKind.values()[slotIndex], System.currentTimeMillis() - startTime); 
+//      
+//      dataInputs[slotIndex] = new DataInputStream(new ByteArrayInputStream(uncompressed));
+      Inflater inflater = new Inflater(true);
+      inflaters[slotIndex] = inflater;  // save to be able to call end() when done. 
+      ByteArrayInputStream baiStream = new ByteArrayInputStream(b);      
+      int zipBufSize = Math.max(32768, bytesCompr);
+      InflaterInputStream iis = new InflaterInputStream(baiStream, inflater, zipBufSize);
+      dataInputs[slotIndex] = new DataInputStream(new BufferedInputStream(iis, zipBufSize));
+    }
+    
+    private void closeDataInputs() {
+      for (DataInputStream is : dataInputs) {
+        if (null != is){
+          try {
+            is.close();
+          } catch (IOException e) {
+          }
+        }
+      }
+      // release any space inflater holding on to
+      for (Inflater inflater : inflaters) {
+        if (null != inflater) {
+          inflater.end();
+        }
+      }
+    }
+    
+    private DataInput getInputStream(SlotKind kind) {
+      return dataInputs[kind.i];
+    }
+
+    private int readVnumber(DataInputStream dis) throws IOException {
+      return DataIO.readVnumber(dis);
+    }
+
+    private long readVlong(DataInputStream dis) throws IOException {
+      return DataIO.readVlong(dis);
+    }
+
+    private int readIntoByteArray(int length) throws IOException {
+      int startPos = byteHeapObj.reserve(length);
+      byte_dis.readFully(byteHeapObj.heap, startPos, length);
+      return startPos;
+    }
+
+    private int readIntoShortArray(int length) throws IOException {
+      final int startPos = shortHeapObj.reserve(length);
+      final short[] h = shortHeapObj.heap;
+      final int endPos = startPos + length;
+      short prev = 0;
+      for (int i = startPos; i < endPos; i++) {
+        h[i] = prev = (short)(readDiff(short_dis, prev));
+      }
+      return startPos;   
+    }
+    
+    private int readIntoLongArray(SlotKind kind, int length) throws IOException {
+      final int startPos = longHeapObj.reserve(length);
+      final long[] h = longHeapObj.heap;
+      final int endPos = startPos + length;
+      long prev = 0;
+      for (int i = startPos; i < endPos; i++) {
+        h[i] = prev = readLong(kind, prev);
+      }
+      return startPos;   
+    }
+
+    private void readDiffWithPrevTypeSlot(SlotKind kind, int iHeap, int offset) throws IOException {
+      int prev = (iPrevHeap == 0) ? 0 : heap[iPrevHeap + offset];
+      heap[iHeap + offset] = readDiff(kind, prev);
+    }
+
+    private int readDiff(SlotKind kind, int prev) throws IOException {
+      return readDiff(getInputStream(kind), prev);
+    }
+    
+    private int readDiff(DataInput in, int prev) throws IOException {
+      final long encoded = readVlong(in);
+      final boolean isDelta = (0 != (encoded & 1L));
+      final boolean isNegative = (0 != (encoded & 2L));
+      int v = (int)(encoded >>> 2);
+      if (isNegative) {
+        if (v == 0) {
+          return Integer.MIN_VALUE;
+        }
+        v = -v;
+      }
+      if (isDelta) {
+        v = v + prev;
+      }
+      return v;  
+    }
+        
+    private long readLong(SlotKind kind, long prev) throws IOException {
+      if (kind == Slot_DoubleRef) {
+        return readDouble();
+      }
+    
+      final int vh = readDiff(long_High_dis, (int) (prev >>> 32));
+      final int vl = readDiff(long_Low_dis, (int) prev);
+      final long v = (((long)vh) << 32) | (0xffffffffL & (long)vl);
+      return v;
+    }
+    
+    
+    private int readFloat() throws IOException {
+      final int exponent = readVnumber(float_Exponent_dis);  
+      if (exponent == 0) {
+        return 0;
+      }
+      int mants = readVnumber(float_Mantissa_Sign_dis);
+      final boolean isNegative = (mants & 1) == 1;
+      mants = mants >>> 1;
+      // the next parens needed to get around eclipse / java bug
+      mants = (Integer.reverse(mants) >>> 9);
+          
+      return ((exponent - 1) << 23) |
+             mants | 
+             ((isNegative) ? 0x80000000 : 0);        
+    }
+       
+    private int decodeIntSign(int v) {
+      if (1 == (v & 1)) {
+        return - (v >>> 1);
+      }
+      return v >>> 1;
+    }
+    
+    private long readDouble() throws IOException {
+      int exponent = readVnumber(double_Exponent_dis);
+      if (exponent == 0) {
+        return 0L;
+      }
+      long mants = readVlong(double_Mantissa_Sign_dis);
+      return decodeDouble(mants, exponent);
+    }
+    
+    private long decodeDouble(long mants, int exponent) {
+      exponent = decodeIntSign(exponent);
+      if (exponent > 0) {
+        exponent --;  
+      }
+      exponent = exponent + 1023; 
+      long r = ((long)((exponent) & 0x7ff)) << 52;
+      final boolean isNegative = (1 == (mants & 1));
+      mants = Long.reverse(mants >>> 1) >>> 12;
+      r = r | mants | (isNegative ? 0x8000000000000000L : 0);
+      return r;
+    }
+            
+    private long readVlong(DataInput dis) throws IOException {
+      return DataIO.readVlong(dis);
+    }
+      
+    private int readString() throws IOException {
+      int length = decodeIntSign(readVnumber(strLength_dis));
+      if (0 == length) {
+        return 0;
+      }
+      if (1 == length) {
+        return stringHeapObj.addString("");
+      }
+      
+      if (length < 0) {  // in this case, -length is the slot index
+        return stringTableOffset - length;
+      }
+      int offset = readVnumber(strOffset_dis);
+      int segmentIndex = (only1CommonString) ? 0 :
+        readVnumber(strSeg_dis);
+      String s =  readCommonString[segmentIndex].substring(offset, offset + length - 1);
+      return stringHeapObj.addString(s);
+    }
+
+    /******************************************************************************
+     * Modified Values
+     * 
+     * Modified heap values need fsStartIndexes conversion
+     ******************************************************************************/
+
+    class ReadModifiedFSs {
+      
+      // previous value - for things diff encoded
+      int vPrevModInt = 0;
+      int vPrevModHeapRef = 0;
+      short vPrevModShort = 0;
+      long vPrevModLong = 0;
+      int iHeap;
+      TypeInfo typeInfo;
+
+
+      private void readModifiedFSs() throws IOException {
+        final int modFSsLength = readVnumber(control_dis);
+        iPrevHeap = 0;
+                 
+        for (int i = 0; i < modFSsLength; i++) {
+          iHeap = readVnumber(fsIndexes_dis) + iPrevHeap;
+          iPrevHeap = iHeap;
+  
+          final int tCode = heap[iHeap];
+          typeInfo = getTypeInfo(tCode);
+          
+          final int numberOfModsInThisFs = readVnumber(fsIndexes_dis); 
+  
+          /**************************************************
+           * handle aux byte, short, long array modifications
+           **************************************************/
+          if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) {
+            readModifiedAuxHeap(numberOfModsInThisFs);
+          } else {
+            readModifiedMainHeap(numberOfModsInThisFs);
+          }
+        }
+      }
+      
+      private void readModifiedAuxHeap(int numberOfMods) throws IOException {
+        int prevOffset = 0;
+               
+        final int auxHeapIndex = heap[iHeap + 2];
+        final SlotKind kind = typeInfo.getSlotKind(2);  // get kind of element
+        final boolean isAuxByte = ((kind == Slot_BooleanRef) || (kind == Slot_ByteRef));
+        final boolean isAuxShort = (kind == Slot_ShortRef);
+        final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
+        if (!(isAuxByte | isAuxShort | isAuxLong)) {
+          throw new RuntimeException();  // never happen
+        }
+        
+        for (int i2 = 0; i2 < numberOfMods; i2++) {
+          final int offset = readVnumber(fsIndexes_dis) + prevOffset;
+          prevOffset = offset;
+          
+          if (isAuxByte) {
+            byteHeapObj.setHeapValue(byte_dis.readByte(), auxHeapIndex + offset);
+          } else if (isAuxShort) {
+            final short v = (short)readDiff(int_dis, vPrevModShort);
+            vPrevModShort = v;
+            shortHeapObj.setHeapValue(v, auxHeapIndex + offset);
+          } else {
+            final long v = readLong(kind, vPrevModLong);
+            if (kind == Slot_LongRef) {
+              vPrevModLong = v;
+            }
+            longHeapObj.setHeapValue(v, auxHeapIndex + offset);
+          }    
+        }
+      }
+      
+      private void readModifiedMainHeap(int numberOfMods) throws IOException {
+        int iPrevOffsetInFs = 0;
+        for (int i = 0; i < numberOfMods; i++) {
+          final int offsetInFs = readVnumber(fsIndexes_dis) + iPrevOffsetInFs;
+          iPrevOffsetInFs = offsetInFs;
+          final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : offsetInFs);
+          
+          switch (kind) {
+          case Slot_HeapRef: {
+              int v = readDiff(heapRef_dis, vPrevModHeapRef);
+              vPrevModHeapRef = v;
+              if (CHANGE_FS_REFS_TO_SEQUENTIAL) {
+                v = fsStartIndexes.getItemAddr(v);
+              }
+              heap[iHeap + offsetInFs] = v;
+            }
+            break;
+          case Slot_Int: {
+              final int v = readDiff(int_dis, vPrevModInt);
+              vPrevModInt = v;
+              heap[iHeap + offsetInFs] = v;
+            }
+            break;
+          case Slot_Short: {
+              final int v = readDiff(int_dis, vPrevModShort);
+              vPrevModShort = (short)v;
+              heap[iHeap + offsetInFs] = v;
+            }
+            break;
+          case Slot_LongRef: case Slot_DoubleRef: {
+              final long v = readLong(kind, vPrevModLong);
+              if (kind == Slot_LongRef) {
+                vPrevModLong = v;
+              }
+              heap[iHeap + offsetInFs] = longHeapObj.addLong(v);
+            }
+            break;
+          case Slot_Byte: case Slot_Boolean:
+            heap[iHeap + offsetInFs] = byte_dis.readByte();
+            break;
+          case Slot_Float:
+            heap[iHeap + offsetInFs] = readFloat();
+            break;
+          case Slot_StrRef:
+            heap[iHeap + offsetInFs] = readString();
+            break;
+         default:
+            throw new RuntimeException();
+          }
+        }   
+      }
+    }
+  }
+
+  /********************************************************************
+   * methods common to serialization / deserialization etc.
+   ********************************************************************/
+  
+  private int incrToNextFs(int[] heap, int iHeap, TypeInfo typeInfo) {
+    if (typeInfo.isHeapStoredArray) {
+      return 2 + heap[iHeap + 1];
+    } else {
+      return 1 + typeInfo.slotKinds.length;
+    }
+  }
+
+  
+  private void initFsStartIndexes (ComprItemRefs fsStartIndexes, int[] heap, int heapStart, int heapEnd, int[] histo) {
+    for (int iHeap = 1; iHeap < heapEnd;) {
+      fsStartIndexes.addItemAddr(iHeap);
+      final int tCode = heap[iHeap];
+      if ((null != histo) && (iHeap >= heapStart)) {
+        histo[tCode] ++;
+      }
+      TypeInfo typeInfo = getTypeInfo(tCode);
+      iHeap += incrToNextFs(heap, iHeap, typeInfo);
+    }
+    fsStartIndexes.finishSetup();
+  }  
+
+  private void resetIprevious() {
+    for (int i = 1; i < typeInfoArray.length; i++) {
+      TypeInfo typeInfo = typeInfoArray[i];  // skip 0 which is null
+      if (null != typeInfo) {
+        typeInfo.iPrevHeap = 0;
+      }
+    }
+  } 
+
+  // this method is required, instead of merely making
+  // a "new" instance, so that
+  // the containing instance of BinaryCasSerDes4 can be
+  // accessed for the type info
+  
+  public CasCompare getCasCompare() {
+    return new CasCompare();
+  }
+  
+  public class CasCompare {
+    /** 
+     * Compare 2 CASes for equal
+     * The layout of refs to aux heaps does not have to match
+     */
+      private CASImpl c1;
+      private CASImpl c2;
+      private Heap c1HO;
+      private Heap c2HO;
+      private int[] c1heap;
+      private int[] c2heap;
+      private TypeInfo typeInfo;
+      private int iHeap;
+      
+    public boolean compareCASes(CASImpl c1, CASImpl c2) {
+      this.c1 = c1;
+      this.c2 = c2;
+      c1HO = c1.getHeap();
+      c2HO = c2.getHeap();
+      final int endi = c1HO.getCellsUsed();
+      final int end2 = c2HO.getCellsUsed();
+      if (endi != end2) {
+        System.err.format("CASes have different heap cells used: %,d %,d%n", endi, end2);
+      }
+      c1heap = c1HO.heap;
+      c2heap = c2HO.heap;
+      
+      ComprItemRefs fsStartIndexes = new ComprItemRefs();
+      initFsStartIndexes(fsStartIndexes, c1heap, 1, endi, null);
+      
+      final int endsi = fsStartIndexes.getNbrOfItems();
+      for (int i = 1; i < endsi; i++) {
+        iHeap = fsStartIndexes.getItemAddr(i);
+//        System.out.println("");
+        if (!compareFss()) {
+          return false;
+        }
+      }
+      
+      int[] ifs1 = c1.getIndexedFSs();
+      int[] ifs2 = c2.getIndexedFSs();
+      
+      return Arrays.equals(ifs1, ifs2);
+    }
+
+    private boolean compareFss() {
+      int tCode = c1heap[iHeap];
+      typeInfo = getTypeInfo(tCode);
+      if (tCode != c2heap[iHeap]) {
+        return mismatchFs();
+      }
+      if (typeInfo.isArray) {
+        return compareFssArray();
+      } else {
+        for (int i = 1; i < typeInfo.slotKinds.length + 1; i++) {
+          if (!compareSlot(i)) {
+            return mismatchFs();
+          }
+        }
+        return true;
+      }
+    }
+      
+    private boolean compareFssArray() {
+      int len1 = c1heap[iHeap + 1];
+      int len2 = c2heap[iHeap + 1];
+      if (len1 != len2) {
+        return false;
+      }
+      for (int i = 0; i < len1; i++) {
+        SlotKind kind = typeInfo.getSlotKind(2);
+        if (typeInfo.isHeapStoredArray) {
+          if (kind == Slot_StrRef) {
+            if (! compareStrings(c1.getStringForCode(c1heap[iHeap + 2 + i]),
+                                 c2.getStringForCode(c2heap[iHeap + 2 + i]))) {
+              return mismatchFs();
+            }
+          } else if (c1heap[iHeap + 2 + i] != c2heap[iHeap + 2 + i]) {
+            return mismatchFs();
+          }
+        } else {  // not heap stored array
+          switch (kind) {
+          case Slot_BooleanRef: case Slot_ByteRef:
+            if (c1.getByteHeap().getHeapValue(c1heap[iHeap + 2] + i) !=
+                c2.getByteHeap().getHeapValue(c2heap[iHeap + 2] + i)) {
+              return mismatchFs(); 
+            }
+            break;
+          case Slot_ShortRef:
+            if (c1.getShortHeap().getHeapValue(c1heap[iHeap + 2] + i) !=
+                c2.getShortHeap().getHeapValue(c2heap[iHeap + 2] + i)) {
+              return mismatchFs();
+            }
+            break;
+          case Slot_LongRef: case Slot_DoubleRef: {
+            if (c1.getLongHeap().getHeapValue(c1heap[iHeap + 2] + i)  !=
+                c1.getLongHeap().getHeapValue(c1heap[iHeap + 2] + i)) {
+              return mismatchFs();
+            }
+            break;
+          }
+          default: throw new RuntimeException("internal error");
+          }
+        }
+      } // end of for
+      return true;
+    }
+    
+    private boolean compareSlot(int offset) {
+      SlotKind kind = typeInfo.getSlotKind(offset);
+      switch (kind) {
+      case Slot_Int: case Slot_Short: case Slot_Boolean: case Slot_Byte: 
+      case Slot_Float: case Slot_HeapRef:
+        return c1heap[iHeap + offset] == c2heap[iHeap + offset];
+      case Slot_StrRef:
+        return compareStrings(c1.getStringForCode(c1heap[iHeap + offset]),
+                              c2.getStringForCode(c2heap[iHeap + offset]));
+      case Slot_LongRef: case Slot_DoubleRef:
+        return c1.getLongHeap().getHeapValue(c1heap[iHeap + offset]) ==
+               c2.getLongHeap().getHeapValue(c2heap[iHeap + offset]);
+      default: throw new RuntimeException("internal error");      
+      }
+    }
+    
+    private boolean compareStrings(String s1, String s2) {
+      if ((null == s1) && (null == s2)) {
+        return true;
+      }
+      return s1.equals(s2);
+    }
+     
+    private boolean mismatchFs() {
+      System.err.format("Mismatched Feature Structures:%n %s%n %s%n", 
+          dumpHeapFs(c1), dumpHeapFs(c2));
+      return false;
+    }
+    
+    private StringBuilder dumpHeapFs(CASImpl cas) {
+      StringBuilder sb = new StringBuilder();
+      typeInfo = getTypeInfo(cas.getHeap().heap[iHeap]);
+      sb.append(typeInfo);
+  
+      if (typeInfo.isHeapStoredArray) {
+        sb.append(dumpHeapStoredArray(cas));
+      } else if (typeInfo.isArray) {

[... 448 lines stripped ...]