You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2013/03/05 16:44:14 UTC

svn commit: r1452857 - in /uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src: main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java main/java/org/apache/uima/cas/impl/CasSeqAddrMaps.java test/java/org/apache/uima/cas/impl/SerDesTest.java

Author: schor
Date: Tue Mar  5 15:44:14 2013
New Revision: 1452857

URL: http://svn.apache.org/r1452857
Log:
[UIMA-2498] bunch of updates needed to get new style of serialization working.  This style, instead of sequentially scanning the CAS heap and serializing out all the Feature Structures there, instead, starts from the index repositories for all views, and finds all "reachable" Feature Structures.  This is the same approach taken for xmi serialization, for example.  The result is that unreachable feature structures are not serialized (neither is data they reference in the various aux heaps and string heaps).  Lots of tricky issues involving treatment of delta cas serialization were debugged along the way.  I think this is now working, for equal type systems.  Next is to work on unequal type systems.

Modified:
    uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
    uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/CasSeqAddrMaps.java
    uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest.java

Modified: uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java?rev=1452857&r1=1452856&r2=1452857&view=diff
==============================================================================
--- uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java (original)
+++ uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java Tue Mar  5 15:44:14 2013
@@ -19,6 +19,33 @@
 
 package org.apache.uima.cas.impl;
 
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.NBR_SLOT_KIND_ZIP_STREAMS;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_ArrayLength;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_BooleanRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Byte;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_ByteRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Control;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_DoubleRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Double_Exponent;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Double_Mantissa_Sign;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Float_Exponent;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Float_Mantissa_Sign;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_FsIndexes;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_HeapRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Int;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_LongRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Long_High;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Long_Low;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_MainHeap;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_Short;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_ShortRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_StrChars;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_StrLength;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_StrOffset;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_StrRef;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_StrSeg;
+import static org.apache.uima.cas.impl.SlotKinds.SlotKind.Slot_TypeCode;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
@@ -44,7 +71,6 @@ import org.apache.uima.cas.AbstractCas;
 import org.apache.uima.cas.CASRuntimeException;
 import org.apache.uima.cas.impl.SlotKinds.SlotKind;
 import org.apache.uima.cas.impl.TypeSystemImpl.TypeInfo;
-import org.apache.uima.internal.util.IntListIterator;
 import org.apache.uima.internal.util.IntPointerIterator;
 import org.apache.uima.internal.util.IntVector;
 import org.apache.uima.internal.util.rb_trees.IntArrayRBT;
@@ -54,8 +80,6 @@ import org.apache.uima.util.impl.DataIO;
 import org.apache.uima.util.impl.OptimizeStrings;
 import org.apache.uima.util.impl.SerializationMeasures;
 
-import static org.apache.uima.cas.impl.SlotKinds.SlotKind.*;
-
 /**
  * User callable serialization and deserialization of the CAS in a compressed Binary Format
  * 
@@ -69,11 +93,12 @@ import static org.apache.uima.cas.impl.S
  * How to Serialize:  
  * 
  * 1) create an instance of this class
+ *    a) if doing a delta serialization, pass in the mark and a ReuseInfo object that was created
+ *       after deserializing this CAS initially.
  * 2) call serialize() to serialize the CAS
  * 3) If doing serialization to a target from which you expect to receive back a delta CAS,
- *    keep this object and reuse it for deserializing the delta CAS.
+ *    create a ReuseInfo object from this object and reuse it for deserializing the delta CAS.
  *    
- * Otherwise, you cannot reuse this object.
  * 
  * TypeSystemImpl objects are lazily augmented by customized TypeInfo instances for each type encountered in 
  * serializing or deserializing.  These are preserved for future calls, so their setup / initialization is only
@@ -100,7 +125,8 @@ import static org.apache.uima.cas.impl.S
  *    
  * An alternative interface for Deserialization with type system mapping:
  * 1) get an appropriate CAS to deserialize into (as above).
- * 2) create (or reuse, if delta deserializing) an instance of this class -> xxx 
+ * 2) create an instance of this class -> xxx
+ *    a) if delta deserializing, pass in the ReuseInfo object created when the CAS was serialized 
  * 3) call xxx.deserialize(receivingCas, inputStream, targetTypeSystemImpl)
  *    where the targetTypeSystem is the type system of the cas being deserialized
  * 
@@ -125,7 +151,7 @@ import static org.apache.uima.cas.impl.S
  *   Defaulting:
  *     flags:  doMeasurements, compressLevel, CompressStrategy
  *   Per serialize call: cas, output, [target ts], [mark for delta]
- *   Per deserialize call: cas, input, [target ts]
+ *   Per deserialize call: cas, input, [target ts], whether-to-save-info-for-delta-serialization
  *   
  *   CASImpl has instance method with defaulting args for serialization.
  *   CASImpl has reinit which works with compressed binary serialization objects
@@ -137,27 +163,32 @@ import static org.apache.uima.cas.impl.S
  *   * (de)serialize * filter? * delta? * Use case
  *   **************************************************************************
  *   * serialize     *   N     *   N    * Saving a Cas, 
- *   *               *         *        * sending Cas to svc with identical ts
+ *   *               *         *        * sending Cas to service with identical ts
  *   **************************************************************************
- *   * serialize     *   Y     *   N    * sending Cas to svc with 
- *   *               *         *        * different ts (a subset)
+ *   * serialize     *   Y     *   N    * sending Cas to service with 
+ *   *               *         *        * different ts (a guaranteed subset)
  *   **************************************************************************
  *   * serialize     *   N     *   Y    * returning Cas to client
+ *   *               *         *        *   uses info saved when deserializing
  *   *               *         *        * (?? saving just a delta to disk??)
  *   **************************************************************************
  *   * serialize     *   Y     *   Y    * NOT SUPPORTED (not needed)  
  *   **************************************************************************
  *   * deserialize   *   N     *   N    * reading/(receiving) CAS, identical TS
  *   **************************************************************************
- *   * deserialize   *   Y     *   N    * reading/(receiving) CAS, different TS
+ *   * deserialize   *   Y     *   N    * reading/receiving CAS, different TS
+ *   *               *         *        * ts not guaranteed to be superset
  *   **************************************************************************
- *   * deserialize   *   N     *   Y    * receiving CAS, identical TS, 
+ *   * deserialize   *   N     *   Y    * receiving CAS, identical TS 
+ *   *               *         *        *   uses info saved when serializing
  *   **************************************************************************
  *   * deserialize   *   Y     *   Y    * receiving CAS, different TS (tgt a feature subset)
+ *   *               *         *        *   uses info saved when serializing
  *   **************************************************************************
  */
 public class BinaryCasSerDes6 {
 
+  private static final int[] INT0 = new int[0];
   /**
    * Version of the serializer/deserializer, used to allow deserialization of 
    * older versions
@@ -196,28 +227,42 @@ public class BinaryCasSerDes6 {
     }
   }
   /**
-   * Info reused for multiple serializations of same cas to multiple targets, or
-   * for deserializing with a delta cas.
+   * Info reused for 
+   *   1) multiple serializations of same cas to multiple targets (a speedup), or
+   *   2) for delta cas serialization, where it represents the fsStartIndex info before any mods
+   *      were done which could change that info, or 
+   *   3) for deserializing with a delta cas, where it represents the fsStartIndex info at the time
+   *      the CAS was serialized out..
    * Reachable FSs and Sequence maps
-   * Reuseable in serialization, and for deserialization of delta cas
    */
   public static class ReuseInfo {
-    final private IntArrayRBT foundFSs; // ordered set of FSs found in indexes or linked from other found FSs
+    /**
+     * kept here only to avoid recomputation in the use case:
+     *   serialize to target 1, serialize same to target 2, etc.
+     *   if Null, recomputed when needed
+     */
+    final private IntArrayRBT foundFSs;
+    final private int[] foundFSsArray; // ordered set of FSs found in indexes or linked from other found FSs
+    
+    /**
+     * Multiple uses:
+     *   a) avoid recomputation when multiple serializations of same CAS to multiple targets
+     *   b) remembers required mapping for processing delta cas serializations and deserializations conversion of tgt seq # to src addr
+     */
     final private CasSeqAddrMaps fsStartIndexes;
-    final private TypeSystemImpl tgtTs;
     
     private ReuseInfo(
-        IntArrayRBT foundFSs, 
-        CasSeqAddrMaps fsStartIndexes,
-        TypeSystemImpl tgtTs) {
+        IntArrayRBT foundFSs,
+        int[] foundFSsArray, 
+        CasSeqAddrMaps fsStartIndexes) {
       this.foundFSs = foundFSs;
+      this.foundFSsArray = foundFSsArray;
       this.fsStartIndexes = fsStartIndexes;
-      this.tgtTs = tgtTs;
     }
   }
   
   public ReuseInfo getReuseInfo() {
-    return new ReuseInfo(foundFSs, fsStartIndexes, tgtTs);
+    return new ReuseInfo(foundFSs, foundFSsArray, fsStartIndexes);
   }
     
   // speedups - ints for SlotKind ordinals
@@ -272,7 +317,11 @@ public class BinaryCasSerDes6 {
         private boolean isReadingDelta;
   final private MarkerImpl mark;  // the mark to serialize from
 
+  /**
+   * 
+   */
   final private CasSeqAddrMaps fsStartIndexes;
+  final private boolean reuseInfoProvided;
   final private boolean doMeasurements;  // if true, doing measurements
 
   private OptimizeStrings os;
@@ -288,8 +337,12 @@ public class BinaryCasSerDes6 {
   private int iPrevHeap;        // 0 or heap addr of previous instance of current type
 
   private IntArrayRBT foundFSs; // ordered set of FSs found in indexes or linked from other found FSs
+  private IntArrayRBT foundFSsBelowMark; // for delta serialization use only
+  private int[] foundFSsArray;  // sorted fss's being serialized.  For delta, just the deltas
+//  private HashSetInt ffssBelowMark;  // sorted fss's found below the mark
 //  final private int[] typeCodeHisto = new int[ts.getTypeArraySize()]; 
 
+  final private boolean debugEOF = false;
   /********************************* 
    * Things for just serialization  
    *********************************/
@@ -298,30 +351,30 @@ public class BinaryCasSerDes6 {
   final private SerializationMeasures sm;  // null or serialization measurements
   final private ByteArrayOutputStream[] baosZipSources = new ByteArrayOutputStream[NBR_SLOT_KIND_ZIP_STREAMS];  // lazily created, indexed by SlotKind.i
   final private DataOutputStream[] dosZipSources = new DataOutputStream[NBR_SLOT_KIND_ZIP_STREAMS];      // lazily created, indexed by SlotKind.i
+  private int[] savedAllIndexesFSs;
 
   final private int[] estimatedZipSize = new int[NBR_SLOT_KIND_ZIP_STREAMS]; // one entry for each output stream kind
-
   // speedups
   
   // any use of these means caller handles measurement
   // some of these are never used, because the current impl
   //   is using the _i form to get measurements done
-  private DataOutputStream arrayLength_dos;
-  private DataOutputStream heapRef_dos;
-  private DataOutputStream int_dos;
+//  private DataOutputStream arrayLength_dos;
+//  private DataOutputStream heapRef_dos;
+//  private DataOutputStream int_dos;
   private DataOutputStream byte_dos;
-  private DataOutputStream short_dos;
+//  private DataOutputStream short_dos;
   private DataOutputStream typeCode_dos;
   private DataOutputStream strOffset_dos;
   private DataOutputStream strLength_dos;
-  private DataOutputStream long_High_dos;
-  private DataOutputStream long_Low_dos;
+//  private DataOutputStream long_High_dos;
+//  private DataOutputStream long_Low_dos;
   private DataOutputStream float_Mantissa_Sign_dos;
   private DataOutputStream float_Exponent_dos;
   private DataOutputStream double_Mantissa_Sign_dos;
   private DataOutputStream double_Exponent_dos;
   private DataOutputStream fsIndexes_dos;
-  private DataOutputStream strChars_dos;
+//  private DataOutputStream strChars_dos;
   private DataOutputStream control_dos;
   private DataOutputStream strSeg_dos;
 
@@ -372,10 +425,17 @@ public class BinaryCasSerDes6 {
 
   /**
    * 
-   * @param ts Type System (the source type system)
-   * @param doMeasurements true if measurements should be collected
-   * @param compressLevel 
-   * @param compressStrategy
+   * @param aCas required - refs the CAS being serialized or deserialized into
+   * @param mark if not null is the serialization mark for delta serialization.  Unused for deserialization.
+   * @param tgtTs if not null is the target type system.  For serialization - this is a subset of the CASs TS
+   * @param rfs For delta serialization - must be not null, and the saved value after deserializing the original
+   *                                      before any modifications / additions made.
+   *            For normal serialization - can be null, but if not, is used in place of re-calculating, for speed up
+   *            For delta deserialization - must not be null, and is the saved value after serializing to the service
+   *            For normal deserialization - must be null
+   * @param doMeasurements if true, measurements are done (on serialization)
+   * @param compressLevel if not null, specifies enum instance for compress level
+   * @param compressStrategy if not null, specifies enum instance for compress strategy
    */
   public BinaryCasSerDes6(
       AbstractCas aCas,
@@ -415,14 +475,17 @@ public class BinaryCasSerDes6 {
 
     this.compressLevel = compressLevel;
     this.compressStrategy = compressStrategy;
-    if (null != rfs) {
+    reuseInfoProvided = (rfs != null);
+    if (reuseInfoProvided) {
       foundFSs = rfs.foundFSs;
-      fsStartIndexes = rfs.fsStartIndexes;
-      this.tgtTs = rfs.tgtTs;
+      foundFSsArray = rfs.foundFSsArray;
+      fsStartIndexes = rfs.fsStartIndexes.copy();
     } else {
+      foundFSs = null;
+      foundFSsArray = null;
       fsStartIndexes = new CasSeqAddrMaps();
-      this.tgtTs = tgtTs;
     }
+    this.tgtTs = tgtTs;
   }
   
   public BinaryCasSerDes6(AbstractCas cas) {
@@ -437,6 +500,15 @@ public class BinaryCasSerDes6 {
     this(cas, mark, tgtTs, null, false, CompressLevel.Default, CompressStrat.Default);
   }
 
+  public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark, TypeSystemImpl tgtTs, ReuseInfo rfs) {
+    this(cas, mark, tgtTs, rfs, false, CompressLevel.Default, CompressStrat.Default);
+  }
+  
+  public BinaryCasSerDes6(AbstractCas cas, MarkerImpl mark, TypeSystemImpl tgtTs, ReuseInfo rfs, boolean doMeasurements) {
+    this(cas, mark, tgtTs, rfs, doMeasurements, CompressLevel.Default, CompressStrat.Default);
+  }
+
+
   public BinaryCasSerDes6(AbstractCas cas, ReuseInfo rfs) {
     this(cas, null, null, rfs, false, CompressLevel.Default, CompressStrat.Default);
   }
@@ -457,6 +529,17 @@ public class BinaryCasSerDes6 {
     if (isSerializingDelta && (tgtTs != null)) {
       throw new UnsupportedOperationException("Can't do Delta Serialization with different target TS");
     }
+
+    if (fsStartIndexes == null) {
+      if (isSerializingDelta) {
+        throw new UnsupportedOperationException("Serializing a delta requires valid ReuseInfo for Cas being serialized," +
+        		" captured right after it was deserialized");
+      }
+      if (isReadingDelta) {
+        throw new UnsupportedOperationException("Deserializing a delta requires valid ReuseInfo for Cas being deserialized into");
+      }
+    }
+    
     setupOutputStreams(out);
     
     if (doMeasurements) {
@@ -473,12 +556,17 @@ public class BinaryCasSerDes6 {
     /******************************************************************
      * Find all FSs to be serialized via the indexes
      *   including those FSs referenced  
+     * For Delta Serialization - excludes those FSs below the line
      ******************************************************************/
     
-    if (foundFSs == null) {
+    if (!reuseInfoProvided || isSerializingDelta) {
+      long start = System.currentTimeMillis();
       processIndexedFeatureStructures(cas, false /* compute ref'd FSs, no write */);
+//      System.out.format("Time to enqueue reachable FSs: %,.3f seconds%n", (System.currentTimeMillis() - start)/ 1000f);
     }
     
+    
+    
     /***************************
      * Prepare to walk main heap
      * We prescan the main heap and
@@ -493,11 +581,22 @@ public class BinaryCasSerDes6 {
       // Also, compute sequential maps for non-equal type systems
       // As a side effect, also add all strings that are included
       // in the target type system to the set to be optimized.
+      //   Note: for delta cas, this only picks up strings 
+      //   referenced by FSs above the line
     totalMappedHeapSize = initFsStartIndexes();
     if (heapStart == 0) {
       totalMappedHeapSize++;  // include the null at the start
       heapStart = 1;  // slot 0 not serialized, it's null / 0
     }
+    
+    // add remaining strings for this case:
+    //   deltaCas, FS below the line modified, modification is new string.
+    //   use the deltaCasMod scanning
+    final SerializeModifiedFSs smfs = isSerializingDelta ? new SerializeModifiedFSs() : null;
+    if (isSerializingDelta) {
+      smfs.addModifiedStrings();
+    }
+    
 
     /**************************
      * Strings
@@ -520,11 +619,32 @@ public class BinaryCasSerDes6 {
      * walk main heap
      ***************************/
 
-    final IntListIterator foundFSsIterator = foundFSs.iterator();
     int iHeap;
-    while (foundFSsIterator.hasNext()) {
-      iHeap = foundFSsIterator.next();
-//    for (int iHeap = heapStart; iHeap < heapEnd; iHeap += incrToNextFs(heap, iHeap, typeInfo)) {
+    
+//    { // debug
+//      IntListIterator dit = foundFSs.iterator();
+//      int column = 0;
+//      int[] va = new int[100];
+//      while (dit.hasNext()) {
+//        va[column++] = dit.next();
+//        if (column == 100) {
+//          column = 0;
+//          for (int i = 0; i < 100; i++) {
+//            System.err.format("%,8d ", va[i]);
+//          }
+//          System.err.println("");
+//        }
+//      }
+//      for (int i = 0; i < column; i++) {
+//        System.err.format("%9d ", va[i]);
+//      }
+//      System.err.println("");
+//    }
+    for (int fssi = 0; fssi < foundFSsArray.length; fssi++) {
+      iHeap = foundFSsArray[fssi];
+      if (isDelta && iHeap < mark.nextFSId) {
+        continue;
+      }
       final int tCode = heap[iHeap];  // get type code
       final int mappedTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
       if (mappedTypeCode == 0) { // means no corresponding type in target system
@@ -568,7 +688,7 @@ public class BinaryCasSerDes6 {
     processIndexedFeatureStructures(cas, true /* pass 2 */);
 
     if (isSerializingDelta) {
-      (new SerializeModifiedFSs()).serializeModifiedFSs();
+      smfs.serializeModifiedFSs();
     }
 
     collectAndZip();
@@ -776,16 +896,22 @@ public class BinaryCasSerDes6 {
       if (doMeasurements) {
         sm.statDetails[strLength_i].incr(1);
       }
+      if (debugEOF) {
+        System.out.format("writeString length null 0%n");
+      }
       return;
     } 
     
-    int indexOrSeq = os.getIndexOrSeqIndex(s);
+    final int indexOrSeq = os.getIndexOrSeqIndex(s);
     if (indexOrSeq < 0) {
       final int v = encodeIntSign(indexOrSeq);
       writeVnumber(strLength_dos, v);
       if (doMeasurements) {
         sm.statDetails[strLength_i].incr(DataIO.lengthVnumber(v));
       }
+      if (debugEOF) {
+        System.out.format("writeString length %d%n", indexOrSeq);
+      }
       return;
     }
     
@@ -794,6 +920,9 @@ public class BinaryCasSerDes6 {
       if (doMeasurements) {
         sm.statDetails[strLength_i].incr(1);
       }
+      if (debugEOF) {
+        System.out.format("writeString length 0 as 1%n");
+      }
       return;
     }
     
@@ -816,6 +945,10 @@ public class BinaryCasSerDes6 {
         sm.statDetails[strSeg_i].incr(DataIO.lengthVnumber(csi));
       }
     }
+    if (debugEOF) {
+      System.out.format("writeString length %,d offset %,d%n",
+          length, offset);
+    }
   }
 
   /**
@@ -1021,11 +1154,11 @@ public class BinaryCasSerDes6 {
    ******************************************************************************/
   public class SerializeModifiedFSs {
 
-    final int[] modifiedMainHeapAddrs = cas.getModifiedFSHeapAddrs().toArray();
-    final int[] modifiedFSs = cas.getModifiedFSList().toArray();
-    final int[] modifiedByteHeapAddrs = cas.getModifiedByteHeapAddrs().toArray();
-    final int[] modifiedShortHeapAddrs = cas.getModifiedShortHeapAddrs().toArray();
-    final int[] modifiedLongHeapAddrs = cas.getModifiedLongHeapAddrs().toArray();
+    final int[] modifiedMainHeapAddrs = toArrayOrINT0(cas.getModifiedFSHeapAddrs());
+    final int[] modifiedFSs = toArrayOrINT0(cas.getModifiedFSList());
+    final int[] modifiedByteHeapAddrs = toArrayOrINT0(cas.getModifiedByteHeapAddrs());
+    final int[] modifiedShortHeapAddrs = toArrayOrINT0(cas.getModifiedShortHeapAddrs());
+    final int[] modifiedLongHeapAddrs = toArrayOrINT0(cas.getModifiedLongHeapAddrs());
 
     {sortModifications();}  // a non-static initialization block
     
@@ -1053,9 +1186,30 @@ public class BinaryCasSerDes6 {
     int iHeap;
     TypeInfo typeInfo;
     
+    /**
+     * For Delta Serialization:
+     * Add any strings below the line 
+     * Assume: no TS mapping (because it's delta serialization)
+     */
+    private void addModifiedStrings() {
+//      System.out.println("Enter addModifiedStrings");
+      for (int i = 0; i < modFSsLength; i++) {
+        iHeap = modifiedFSs[i];
+        // skip if no longer indexed-reachable change
+        if (!foundFSsBelowMark.contains(iHeap)) {
+//          System.out.format("  skipping heap addr %,d%n", iHeap);
+          continue;        
+        }
+        final int tCode = heap[iHeap];
+        final TypeInfo typeInfo = ts.getTypeInfo(tCode);
+//        System.out.format("  maybe adding string ");
+        addStringFromFS(typeInfo, iHeap, tCode);
+      }
+//      System.out.println("Exit addModifiedStrings");
+    }
+    
     private void serializeModifiedFSs() throws IOException {
-      // write out number of modified Feature Structures
-      writeVnumber(control_dos, modFSsLength);
+      int skipped = 0;
       // iterate over all modified feature structures
       /**
        * Theorems about these data
@@ -1065,27 +1219,39 @@ public class BinaryCasSerDes6 {
        *   3) Assumption: String array element modifications are main heap slot changes
        *      and recorded as such
        */
-      
+      iPrevHeap = 0;
+      final int splitPoint = mark.nextFSId;
       for (int i = 0; i < modFSsLength; i++) {
-        iHeap = modifiedFSs[i];     
+        iHeap = modifiedFSs[i];
+        final boolean skipping = ((iHeap >= splitPoint) && !foundFSs.contains(iHeap)) ||
+                                 ((iHeap < splitPoint) && !foundFSsBelowMark.contains(iHeap));
         final int tCode = heap[iHeap];
         typeInfo = ts.getTypeInfo(tCode);
         
         // write out the address of the modified FS
         // will convert to seq# internally
-        writeDiff(fsIndexes_i, iHeap, iPrevHeap);
+        if (!skipping) {
+          writeDiff(fsIndexes_i, iHeap, iPrevHeap);
+        }
         // delay updating iPrevHeap until end of "for" loop
         
         /**************************************************
          * handle aux byte, short, long array modifications
          **************************************************/
         if (typeInfo.isArray && (!typeInfo.isHeapStoredArray)) {
-          writeAuxHeapMods();           
+          writeAuxHeapMods(skipping);           
         } else { 
-          writeMainHeapMods(); 
+          writeMainHeapMods(skipping); 
         }  // end of processing 1 modified FS
-        iPrevHeap = iHeap;
+        if (skipping) {
+          skipped ++;
+        } else {
+          iPrevHeap = iHeap;  // iPrevHeap must be 0 or point to a non-skipped one          
+        }
       }  // end of for loop over all modified FSs
+      // write out number of modified Feature Structures
+      writeVnumber(control_dos, modFSsLength - skipped);
+
     }  // end of method
     
     // sort and remove duplicates
@@ -1148,20 +1314,29 @@ public class BinaryCasSerDes6 {
       return i;
     }
     
-    private void writeMainHeapMods() throws IOException {
+    private void writeMainHeapMods(final boolean skipping) throws IOException {
       final int fsLength = incrToNextFs(heap, iHeap, typeInfo);
       final int numberOfModsInFs = countModifiedSlotsInFs(fsLength);
-      writeVnumber(fsIndexes_dos, numberOfModsInFs);
+      if (!skipping) {
+        writeVnumber(fsIndexes_dos, numberOfModsInFs);
+      }
       int iPrevOffsetInFs = 0;
 
       for (int i = 0; i < numberOfModsInFs; i++) {
         final int nextMainHeapIndex = modifiedMainHeapAddrs[imaModMainHeap++];
+        if (skipping) {
+          continue;
+        }
         final int offsetInFs = nextMainHeapIndex - iHeap;
         
         writeVnumber(fsIndexes_dos, offsetInFs - iPrevOffsetInFs);
         iPrevOffsetInFs = offsetInFs;
         
+//        if (typeInfo.isArray && (typeInfo.getSlotKind(2) == Slot_StrRef)) {
+//          System.out.println("writing string array mod");
+//        }
         final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : offsetInFs);
+//        System.out.format("mainHeapModWrite type: %s slot: %s%n", typeInfo, kind);
         
         switch (kind) {
         case Slot_HeapRef:
@@ -1195,7 +1370,7 @@ public class BinaryCasSerDes6 {
       }  // end of looping for all modified slots in this FS
     }
     
-    private void writeAuxHeapMods() throws IOException {
+    private void writeAuxHeapMods(final boolean skipping) throws IOException {
       final int auxHeapIndex = heap[iHeap + 2];
       int iPrevOffsetInAuxArray = 0;
       
@@ -1204,7 +1379,7 @@ public class BinaryCasSerDes6 {
       final boolean isAuxShort = (kind == Slot_ShortRef);
       final boolean isAuxLong = ((kind == Slot_LongRef) || (kind == Slot_DoubleRef));
       
-      if (!(isAuxByte | isAuxShort | isAuxLong)) {
+      if (!(isAuxByte || isAuxShort || isAuxLong)) {
         throw new RuntimeException();  // never happen
       }
       
@@ -1215,32 +1390,35 @@ public class BinaryCasSerDes6 {
                                         isAuxShort ? modShortHeapAddrsLength :
                                                      modLongHeapAddrsLength;
       int imaModXxxRef = isAuxByte  ? imaModByteRef :
-                               isAuxShort ? imaModShortRef : 
-                                            imaModLongRef;
+                         isAuxShort ? imaModShortRef : 
+                                      imaModLongRef;
       
       final int numberOfModsInAuxHeap = countModifiedSlotsInAuxHeap(modXxxHeapAddrs, imaModXxxRef, modXxxHeapAddrsLength);
-      writeVnumber(fsIndexes_dos, numberOfModsInAuxHeap);
+      if (!skipping) {
+        writeVnumber(fsIndexes_dos, numberOfModsInAuxHeap);
+      }
       
       for (int i = 0; i < numberOfModsInAuxHeap; i++) {
         final int nextModAuxIndex = modXxxHeapAddrs[imaModXxxRef++];
         final int offsetInAuxArray = nextModAuxIndex - auxHeapIndex;
-        
-        writeVnumber(fsIndexes_dos, offsetInAuxArray - iPrevOffsetInAuxArray);
-        iPrevOffsetInAuxArray = offsetInAuxArray;
-        
-        if (isAuxByte) {
-          writeUnsignedByte(byte_dos, byteHeapObj.getHeapValue(nextModAuxIndex));
-        } else if (isAuxShort) {
-          final short v = shortHeapObj.getHeapValue(nextModAuxIndex);
-          writeDiff(int_i, v, vPrevModShort);
-          vPrevModShort = v;
-        } else {
-          long v = longHeapObj.getHeapValue(nextModAuxIndex);
-          if (kind == Slot_LongRef) {
-            writeLong(v, vPrevModLong);
-            vPrevModLong = v;    
+        if (!skipping) {
+          writeVnumber(fsIndexes_dos, offsetInAuxArray - iPrevOffsetInAuxArray);
+          iPrevOffsetInAuxArray = offsetInAuxArray;
+          
+          if (isAuxByte) {
+            writeUnsignedByte(byte_dos, byteHeapObj.getHeapValue(nextModAuxIndex));
+          } else if (isAuxShort) {
+            final short v = shortHeapObj.getHeapValue(nextModAuxIndex);
+            writeDiff(int_i, v, vPrevModShort);
+            vPrevModShort = v;
           } else {
-            writeDouble(v);
+            long v = longHeapObj.getHeapValue(nextModAuxIndex);
+            if (kind == Slot_LongRef) {
+              writeLong(v, vPrevModLong);
+              vPrevModLong = v;    
+            } else {
+              writeDouble(v);
+            }
           }
         }
         
@@ -1267,7 +1445,7 @@ public class BinaryCasSerDes6 {
     readHeader();
 
     if (isReadingDelta) {
-      if (null == foundFSs) {
+      if (!reuseInfoProvided) {
         throw new UnsupportedOperationException("Deserializing Delta Cas, but original not serialized from");
       }
     } else {
@@ -1330,7 +1508,7 @@ public class BinaryCasSerDes6 {
 
     if (isReadingDelta) {
       // scan current source being added to / merged into
-      if (fsStartIndexes.getNumberSrcFss() == 1) {
+      if (!reuseInfoProvided) {
         throw new IllegalStateException("Reading Delta into CAS not serialized from");
       }
     }
@@ -1538,7 +1716,7 @@ public class BinaryCasSerDes6 {
       }
       break;
     case Slot_LongRef: {
-      long v = readLong(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+      long v = readLongOrDouble(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
       if (v == 0L) {
         if (longZeroIndex == -1) {
           longZeroIndex = longHeapObj.addLong(0L);
@@ -1678,11 +1856,15 @@ public class BinaryCasSerDes6 {
       final int endPos = startPos + length;
       long prev = 0;
       for (int i = startPos; i < endPos; i++) {
-        h[i] = prev = readLong(kind, prev);
+        h[i] = prev = readLongOrDouble(kind, prev);
       }
       return startPos;
     } else {
-      skipLong(length);
+      if (kind == Slot_LongRef) {
+        skipLong(length);
+      } else {
+        skipDouble(length);
+      }
       return 0;
     }
   }
@@ -1722,7 +1904,7 @@ public class BinaryCasSerDes6 {
 
   }
       
-  private long readLong(SlotKind kind, long prev) throws IOException {
+  private long readLongOrDouble(SlotKind kind, long prev) throws IOException {
     if (kind == Slot_DoubleRef) {
       return readDouble();
     }
@@ -1733,12 +1915,18 @@ public class BinaryCasSerDes6 {
     return v;
   }
   
-  private void skipLong(int length) throws IOException {
+  private void skipLong(final int length) throws IOException {
     for (int i = 0; i < length; i++) {
       long_High_dis.skipBytes(8);
       long_Low_dis.skipBytes(8);
     }
   }
+  
+  private void skipDouble(final int length) throws IOException {
+    for (int i = 0; i < length; i++) {
+      readDouble();
+    }
+  }
      
   private int readFloat() throws IOException {
     final int exponent = readVnumber(float_Exponent_dis);  
@@ -1790,7 +1978,10 @@ public class BinaryCasSerDes6 {
   }
     
   private int readString(boolean storeIt) throws IOException {
-    int length = decodeIntSign(readVnumber(strLength_dis));
+    final int length = decodeIntSign(readVnumber(strLength_dis));
+    if (debugEOF) {
+//      System.out.format("readString length = %,d%n", length);
+    }
     if (0 == length) {
       return 0;
     }
@@ -1809,9 +2000,12 @@ public class BinaryCasSerDes6 {
         return 0;
       }
     }
-    int offset = readVnumber(strOffset_dis);
-    int segmentIndex = (only1CommonString) ? 0 :
+    final int offset = readVnumber(strOffset_dis);
+    final int segmentIndex = (only1CommonString) ? 0 :
       readVnumber(strSeg_dis);
+    if (debugEOF) {
+      System.out.format("readString offset = %,d%n", offset);
+    }
     if (storeIt) {
       String s =  readCommonString[segmentIndex].substring(offset, offset + length - 1);
       return stringHeapObj.addString(s);
@@ -1927,7 +2121,7 @@ public class BinaryCasSerDes6 {
           vPrevModShort = v;
           shortHeapObj.setHeapValue(v, auxHeapIndex + offset);
         } else {
-          final long v = readLong(kind, vPrevModLong);
+          final long v = readLongOrDouble(kind, vPrevModLong);
           if (kind == Slot_LongRef) {
             vPrevModLong = v;
           }
@@ -1937,6 +2131,7 @@ public class BinaryCasSerDes6 {
     }
     
     private void readModifiedMainHeap(int numberOfMods) throws IOException {
+      
       int iPrevTgtOffsetInFs = 0;
       for (int i = 0; i < numberOfMods; i++) {
         final int tgtOffsetInFs = readVnumber(fsIndexes_dis) + iPrevTgtOffsetInFs;
@@ -1948,7 +2143,7 @@ public class BinaryCasSerDes6 {
           throw new RuntimeException();
         }
         final SlotKind kind = typeInfo.getSlotKind(typeInfo.isArray ? 2 : srcOffsetInFs);
-        
+//        System.out.format("mainHeapModRead type: %s slot: %s%n", typeInfo, kind);
         switch (kind) {
         case Slot_HeapRef: {
             final int tgtSeq = readDiff(heapRef_dis, prevModHeapRefTgtSeq);
@@ -1969,11 +2164,14 @@ public class BinaryCasSerDes6 {
             heap[iHeap + srcOffsetInFs] = v;
           }
           break;
-        case Slot_LongRef: case Slot_DoubleRef: {
-            final long v = readLong(kind, vPrevModLong);
-            if (kind == Slot_LongRef) {
-              vPrevModLong = v;
-            }
+        case Slot_LongRef: {
+            final long v = readLongOrDouble(kind, vPrevModLong);
+            vPrevModLong = v;
+            heap[iHeap + srcOffsetInFs] = longHeapObj.addLong(v);
+          }
+          break;
+        case Slot_DoubleRef: {
+            final long v = readDouble();
             heap[iHeap + srcOffsetInFs] = longHeapObj.addLong(v);
           }
           break;
@@ -2013,6 +2211,7 @@ public class BinaryCasSerDes6 {
    *   of all FSs that are to be serialized:
    *     because they are in some index, or
    *     are pointed to by something that is in some index (recursively)
+   *   excluding those below the mark
    *   
    *   The second thing is to serialize out the index information.
    *   This step has to wait until the first time call has completed and 
@@ -2024,9 +2223,20 @@ public class BinaryCasSerDes6 {
    */
   private void processIndexedFeatureStructures(CASImpl cas, boolean isWrite) throws IOException {
     if (!isWrite) {
-      foundFSs = new IntArrayRBT();
+      // size the foundFSs at about 1/8 the guessed size
+      foundFSs = new IntArrayRBT(Math.max(1024, cas.getHeap().getCellsUsed() >> 6));
+      foundFSsBelowMark = isSerializingDelta ? new IntArrayRBT() : null;
+    }
+    final int[] fsIndexes = isWrite ? 
+                              // this alternative collects just the new FSs above the line
+                              (isSerializingDelta ? cas.getDeltaIndexedFSs(mark) : savedAllIndexesFSs) :
+                              // this alternative picks up the following use case:
+                              //   A modification of something below the line now has a new fs ref to something
+                              //   above the line, not otherwise referenced
+                                cas.getIndexedFSs();  
+    if (!isWrite) {
+      savedAllIndexesFSs = fsIndexes;
     }
-    final int[] fsIndexes = isSerializingDelta ? cas.getDeltaIndexedFSs(mark) : cas.getIndexedFSs();
     final int nbrViews = fsIndexes[0];
     final int nbrSofas = fsIndexes[1];
 
@@ -2058,22 +2268,34 @@ public class BinaryCasSerDes6 {
         enqueueFS(foundFSs, addrSofaFs);
       }
     }
-     
+    
+    heap = cas.getHeap().heap;   // referred to in processFsxPart
     for (int vi = 0; vi < nbrViews; vi++) {
       fi = processFsxPart(fsIndexes, fi, foundFSs, isWrite);    // added FSs
       if (isWrite && isSerializingDelta) {
-        fi = processFsxPart(fsIndexes, fi, null, false);  // removed FSs
-        fi = processFsxPart(fsIndexes, fi, null, false);  // reindexed FSs
+        fi = processFsxPart(fsIndexes, fi, null, true);  // removed FSs
+        fi = processFsxPart(fsIndexes, fi, null, true);  // reindexed FSs
       }
     } 
+    if (!isWrite) {
+      final IntPointerIterator foundFSsIteratorx = foundFSs.pointerIterator();
+      foundFSsIteratorx.moveToFirst();
+      final int fsslen = foundFSs.size();
+      foundFSsArray = new int[fsslen];
+      for (int i = 0; i < fsslen; i++) {
+        foundFSsArray[i] = foundFSsIteratorx.get();
+        foundFSsIteratorx.inc();
+      }
+//      Arrays.sort(foundFSsArray);
+    }
     return;
   }
 
   private int processFsxPart(
-      int[] fsIndexes, 
-      int fsNdxStart,
-      IntArrayRBT foundFSs, 
-      boolean isWrite) throws IOException {
+      final int[] fsIndexes, 
+      final int fsNdxStart,
+      final IntArrayRBT foundFSs, 
+      final boolean isWrite) throws IOException {
     int ix = fsNdxStart;
     final int nbrEntries = fsIndexes[ix++];
     final int end = ix + nbrEntries;
@@ -2128,12 +2350,20 @@ public class BinaryCasSerDes6 {
       return;
     }
     if (0 != fsAddr) {
-      if (!foundFSs.containsKey(fsAddr)) {
-        if (!isSerializingDelta || mark.isNew(fsAddr)) {
-          foundFSs.insertKey(fsAddr);
+      boolean added;
+//      if (!foundFSs.contains(fsAddr) && 
+//          ((foundFSsBelowMark == null) || 
+//           (fsAddr >= heapStart) || 
+//           !foundFSsBelowMark.contains(fsAddr))) {
+        if (fsAddr >= heapStart) { // don't add items below the line, but do scan their features
+          added = foundFSs.addAdded(fsAddr);
+        } else {
+          added = foundFSsBelowMark.addAdded(fsAddr);
+        }
+        if (added) {
           enqueueFeatures(foundFSs, fsAddr);
         }
-      }
+//      }
     }
   }
   
@@ -2182,9 +2412,8 @@ public class BinaryCasSerDes6 {
   
   /**
    * Serializing:
-   *   Called at beginning, scans whole CAS
-   * Deserializing (Delta Cas only):
-   *   Called at beginning if doing delta CAS, scans old CAS up to mark
+   *   Called at beginning of serialize, scans whole CAS or just delta CAS
+   *   If doing delta serialization, fsStartIndexes is passed in, pre-initialized with a copy of the map info below the line.
    * @param fsStartIndexes
    * @param srcHeap
    * @param srcHeapStart
@@ -2198,14 +2427,17 @@ public class BinaryCasSerDes6 {
    
     final boolean isTypeMapping = isTypeMappingCmn;
     final CasTypeSystemMapper typeMapper = typeMapperCmn;
-    final IntListIterator foundFSsIterator = foundFSs.iterator();
     
     int tgtHeapUsed = 0;
-    int nextTgtHeap = 1;
-    int markStringHeap = (isDelta) ? mark.getNextStringHeapAddr() : 0;
+    int nextTgtHeap = isSerializingDelta ? mark.nextFSId : 1;
     
-    while (foundFSsIterator.hasNext()) {
-      final int iSrcHeap = foundFSsIterator.next();
+    // for delta serialization - the iterator is only for things above the line.
+
+    for (int i = 0; i < foundFSsArray.length; i++) {
+      final int iSrcHeap = foundFSsArray[i];
+      // for delta serialization, no type mapping is supported, 
+      // however, some created FSs above the line may not be "reachable" and 
+      // therefore, skipped. 
       final int iTgtHeap = nextTgtHeap;
       final int tCode = heap[iSrcHeap];
       final int tgtTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
@@ -2228,50 +2460,63 @@ public class BinaryCasSerDes6 {
       
       // add strings for included types (only when serializing)
       if (isIncludedType && (os != null)) { 
-
+        // skip if delta and fs is below the line
+        // Well, we can't do that - it may be that the fs is below the line, but the string slot 
+        //   has been updated  (modified).
+        // Well, this code is only called for FSs above the line... So need another method
+        //   to pick up those modified strings - see addModifiedStrings;
+            
         // next test only true if tgtTypeInfo.slotKinds[1] == Slot_StrRef
         // because this is the built-in type string array which is final
-        if (srcTypeInfo.isHeapStoredArray && (srcTypeInfo.slotKinds[1] == Slot_StrRef)) {
-          for (int i = 0; i < heap[iSrcHeap + 1]; i++) {
-            // this bit of strange logic depends on the fact that all new and updated strings
-            // are "added" at the end of the string heap in the current impl
-            final int strHeapIndex = heap[iSrcHeap + 2 + i];
-            if (strHeapIndex >= markStringHeap) {
-              os.add(stringHeapObj.getStringForCode(strHeapIndex));
-            }
-          }
-        } else {
-          final int[] strOffsets = srcTypeInfo.strRefOffsets;
-          final boolean[] fSrcInTgt = isTypeMapping ? typeMapper.getFSrcInTgt(tCode) : null;
-          for (int i = 0; i < strOffsets.length; i++ ) {
-            int srcOffset = strOffsets[i];  // offset to slot having str ref
-            // add only those strings in slots that are in target
-            if (!isTypeMapping || fSrcInTgt[srcOffset]) {
-              final int strHeapIndex = heap[iSrcHeap + strOffsets[i]];
-              // this bit of strange logic depends on the fact that all new and updated strings
-              // are "added" at the end of the string heap in the current impl
-              if (strHeapIndex >= markStringHeap) {
-                os.add(stringHeapObj.getStringForCode(strHeapIndex));
-              }
-            }
-          }
-        }
+        addStringFromFS(srcTypeInfo, iSrcHeap, tCode);
       }
             
       // Advance to next Feature Structure, in both source and target heap frame of reference
       if (isIncludedType) {
         final int deltaTgtHeap = incrToNextFs(heap, iSrcHeap, tgtTypeInfo);
         nextTgtHeap += deltaTgtHeap;
-        if (iSrcHeap >= heapStart) {  // don't use up tgt heap if delta, and below the mark
+//        if (iSrcHeap >= heapStart) {  // don't use up tgt heap if delta, and below the mark
+                                        // with current design is always true.
           tgtHeapUsed += deltaTgtHeap;
-        }
+//        }
       }
     }
     
     return tgtHeapUsed;  // side effect: set up fsStartIndexes
   } 
 
-
+  private void addStringFromFS(TypeInfo srcTypeInfo, int iSrcHeap, int tCode) {
+    final int markStringHeap = (isDelta) ? mark.getNextStringHeapAddr() : 0;
+    if (srcTypeInfo.isHeapStoredArray && (srcTypeInfo.slotKinds[1] == Slot_StrRef)) {
+      for (int i = 0; i < heap[iSrcHeap + 1]; i++) {
+        // this bit of strange logic depends on the fact that all new and updated strings
+        // are "added" at the end of the string heap in the current impl
+        final int strHeapIndex = heap[iSrcHeap + 2 + i];
+        if (strHeapIndex >= markStringHeap) {
+          os.add(stringHeapObj.getStringForCode(strHeapIndex));
+//          System.out.format("addStringFromFS:  %s%n", stringHeapObj.getStringForCode(strHeapIndex));
+        } else {
+//          System.out.format("addStringFromFS: skipping add of str %s%n", stringHeapObj.getStringForCode(strHeapIndex));
+        }
+        
+      }
+    } else {
+      final int[] strOffsets = srcTypeInfo.strRefOffsets;
+      final boolean[] fSrcInTgt = isTypeMapping ? typeMapper.getFSrcInTgt(tCode) : null;
+      for (int i = 0; i < strOffsets.length; i++ ) {
+        int srcOffset = strOffsets[i];  // offset to slot having str ref
+        // add only those strings in slots that are in target
+        if (!isTypeMapping || fSrcInTgt[srcOffset]) {
+          final int strHeapIndex = heap[iSrcHeap + strOffsets[i]];
+          // this bit of strange logic depends on the fact that all new and updated strings
+          // are "added" at the end of the string heap in the current impl
+          if (strHeapIndex >= markStringHeap) {
+            os.add(stringHeapObj.getStringForCode(strHeapIndex));
+          }
+        }
+      }
+    }
+  }
   /**
    * Compare 2 CASes, with perhaps different type systems.
    * If the type systems are different, construct a type mapper and use that
@@ -2326,34 +2571,29 @@ public class BinaryCasSerDes6 {
     }
       
     public boolean compareCASes() {
-      IntArrayRBT c1FoundFSs;
-      IntArrayRBT c2FoundFSs;
+      final int[] c1FoundFSs;
+      final int[] c2FoundFSs;
       try {
+        heapStart = 0;  // referenced by the following method
         processIndexedFeatureStructures(c1, false);
-        c1FoundFSs = foundFSs;
-        foundFSs = new IntArrayRBT();
+        c1FoundFSs = foundFSsArray;
         processIndexedFeatureStructures(c2, false);
-        c2FoundFSs = foundFSs;
+        c2FoundFSs = foundFSsArray;
       } catch (IOException e) {
         throw new RuntimeException(e);  // never happen
       }
-      IntPointerIterator c1Iterator = c1FoundFSs.pointerIterator();
-      IntPointerIterator c2Iterator = c2FoundFSs.pointerIterator();
 
-      int i = 0;
-      while (c1Iterator.isValid()) {
-        addr2seq1.put(c1Iterator.get(), i++);
-        c1Iterator.inc();
-      }
-      i = 0;
-      while (c2Iterator.isValid()) {
-        addr2seq2.put(c2Iterator.get(), i++);
-        c2Iterator.inc();
+      for (int i = 0; i < c1FoundFSs.length; i++) {
+        final int v = c1FoundFSs[i];
+//        System.out.format("compare 1: seq = %,d addr=%,d%n", i, v);
+        addr2seq1.put(v, i);
+      }
+      for (int i = 0; i < c2FoundFSs.length; i++) {
+        final int v = c2FoundFSs[i];
+//        System.out.format("compare 2: seq = %,d addr=%,d%n", i, v);
+        addr2seq2.put(v, i);
       }
-      
-      c1Iterator.moveToFirst();
-      c2Iterator.moveToFirst();
-      
+            
 //      initFsStartIndexesCompare();
       
       // Iterating over both CASes
@@ -2370,9 +2610,12 @@ public class BinaryCasSerDes6 {
 //      boolean pastEnd1 = false;
 //      boolean pastEnd2 = false;      
       
-      while (c1Iterator.isValid() && c2Iterator.isValid()) {
-        c1heapIndex = c1Iterator.get();
-        c2heapIndex = c2Iterator.get();
+//      while (c1Iterator.isValid() && c2Iterator.isValid()) {
+      int i1 = 0;
+      int i2 = 0;
+      while (i1 < c1FoundFSs.length && i2 < c2FoundFSs.length) {
+        c1heapIndex = c1FoundFSs[i1];
+        c2heapIndex = c2FoundFSs[i2];
         if (isTypeMapping) {
           final int tCode1_2 = typeMapper.mapTypeCodeSrc2Tgt(c1heap[c1heapIndex]);
           final int tCode2_1 = typeMapper.mapTypeCodeTgt2Src(c2heap[c2heapIndex]);
@@ -2380,50 +2623,50 @@ public class BinaryCasSerDes6 {
             if (!compareFss()) {
               return false;
             }
-            c1Iterator.inc();
-            c2Iterator.inc();
+            i1++;
+            i2++;
             continue;
           }
           if ((tCode1_2 == 0) && (tCode2_1 == 0)) {
-            c1Iterator.inc();
-            c2Iterator.inc();
+            i1++;
+            i2++;
             continue;
           }
           if ((tCode1_2 == 0) && (tCode2_1 != 0)) {
-            c2Iterator.inc();
+            i2++;
             continue;
           }
           if ((tCode1_2 != 0) && (tCode2_1 == 0)) {
-            c1Iterator.inc();
+            i1++;
             continue;
           }
         } else {  // not type mapping
           if (!compareFss()) {
             return false;
           }
-          c1Iterator.inc();
-          c2Iterator.inc();
+          i1++;
+          i2++;
           continue;
         }
       }
       
-      if (!c1Iterator.isValid() && !c2Iterator.isValid()) {
+      if (i1 >= c1FoundFSs.length && i1 >= c2FoundFSs.length) {
         return true;  // end, everything compared
       }
       if (isTypeMapping) {
-        while (c1Iterator.isValid()) {
-          c1heapIndex = c1Iterator.get();
+        while (i1 < c1FoundFSs.length) {
+          c1heapIndex = c1FoundFSs[i1];
           if (typeMapper.mapTypeCodeSrc2Tgt(c1heap[c1heapIndex]) != 0) {
             return false;  // have more FSs in c1 than in c2
           }
-          c1Iterator.inc();
+          i1++;
         }
-        while (c2Iterator.isValid()) {
-          c2heapIndex = c2Iterator.get();
+        while (i2 < c2FoundFSs.length) {
+          c2heapIndex = c2FoundFSs[i2];
           if (typeMapper.mapTypeCodeTgt2Src(c2heap[c2heapIndex]) != 0) {
             return false;  // have more FSs in c2 than in c1
           }
-          c1Iterator.inc();
+          i2++;
         }
       }
       return true;
@@ -2451,7 +2694,7 @@ public class BinaryCasSerDes6 {
       int len1 = c1heap[c1heapIndex + 1];
       int len2 = c2heap[c2heapIndex + 1];
       if (len1 != len2) {
-        return false;
+        return mismatchFs();
       }
       for (int i = 0; i < len1; i++) {
         SlotKind kind = typeInfo.getSlotKind(2);
@@ -2462,8 +2705,11 @@ public class BinaryCasSerDes6 {
               return mismatchFs();
             }
           } else if (kind == Slot_HeapRef) {
-            if (addr2seq1.get(c1heap[c1heapIndex + 2 + i]) != 
-                addr2seq2.get(c2heap[c2heapIndex + 2 + i])) {
+            final int c1ref = c1heap[c1heapIndex + 2 + i];
+            final int c2ref = c2heap[c2heapIndex + 2 + i];
+            if ((c1ref != 0) && 
+                (c2ref != 0) && 
+                (addr2seq1.get(c1ref) != addr2seq2.get(c2ref))) {
               return mismatchFs();
             }
           } else if (c1heap[c1heapIndex + 2 + i] != c2heap[c2heapIndex + 2 + i]) {
@@ -2485,7 +2731,7 @@ public class BinaryCasSerDes6 {
             break;
           case Slot_LongRef: case Slot_DoubleRef: {
             if (c1.getLongHeap().getHeapValue(c1heap[c1heapIndex + 2] + i)  !=
-                c1.getLongHeap().getHeapValue(c1heap[c2heapIndex + 2] + i)) {
+                c2.getLongHeap().getHeapValue(c2heap[c2heapIndex + 2] + i)) {
               return mismatchFs();
             }
             break;
@@ -2503,9 +2749,13 @@ public class BinaryCasSerDes6 {
       case Slot_Int: case Slot_Short: case Slot_Boolean: case Slot_Byte: 
       case Slot_Float: 
         return c1heap[c1heapIndex + offset] == c2heap[c2heapIndex + offset];
-      case Slot_HeapRef:
-        return addr2seq1.get(c1heap[c1heapIndex + offset]) ==
-               addr2seq2.get(c2heap[c2heapIndex + offset]);
+      case Slot_HeapRef: {
+        final int c1ref = c1heap[c1heapIndex + offset];
+        final int c2ref = c2heap[c2heapIndex + offset];
+        return ((c1ref == 0) && (c2ref == 0)) ||
+               ((c1ref != 0) && (c2ref != 0) && 
+                (addr2seq1.get(c1ref) == addr2seq2.get(c2ref)));
+      }
       case Slot_StrRef:
         return compareStrings(c1.getStringForCode(c1heap[c1heapIndex + offset]),
                               c2.getStringForCode(c2heap[c2heapIndex + offset]));
@@ -2520,6 +2770,9 @@ public class BinaryCasSerDes6 {
       if ((null == s1) && (null == s2)) {
         return true;
       }
+      if (null == s1) {
+        return false;
+      }
       return s1.equals(s2);
     }
      
@@ -2580,7 +2833,7 @@ public class BinaryCasSerDes6 {
     private StringBuilder dumpHeapFs(CASImpl cas, final int iHeap, final TypeSystemImpl ts) {
       StringBuilder sb = new StringBuilder();
       typeInfo = ts.getTypeInfo(cas.getHeap().heap[iHeap]);
-      sb.append(typeInfo);
+      sb.append(typeInfo).append(' ');
   
       if (typeInfo.isHeapStoredArray) {
         sb.append(dumpHeapStoredArray(cas, iHeap));
@@ -2779,22 +3032,22 @@ public class BinaryCasSerDes6 {
     }
     // below must follow the setupOutputStream calls above
     
-    arrayLength_dos = dosZipSources[arrayLength_i];
-    heapRef_dos = dosZipSources[heapRef_i];
-    int_dos = dosZipSources[int_i];
+//    arrayLength_dos = dosZipSources[arrayLength_i];
+//    heapRef_dos = dosZipSources[heapRef_i];
+//    int_dos = dosZipSources[int_i];
     byte_dos = dosZipSources[byte_i];
-    short_dos = dosZipSources[short_i];
+//    short_dos = dosZipSources[short_i];
     typeCode_dos = dosZipSources[typeCode_i];
     strOffset_dos = dosZipSources[strOffset_i];
     strLength_dos = dosZipSources[strLength_i];
-    long_High_dos = dosZipSources[long_High_i];
-    long_Low_dos = dosZipSources[long_Low_i];
+//    long_High_dos = dosZipSources[long_High_i];
+//    long_Low_dos = dosZipSources[long_Low_i];
     float_Mantissa_Sign_dos = dosZipSources[float_Mantissa_Sign_i];
     float_Exponent_dos = dosZipSources[float_Exponent_i];
     double_Mantissa_Sign_dos = dosZipSources[double_Mantissa_Sign_i];
     double_Exponent_dos = dosZipSources[double_Exponent_i];
     fsIndexes_dos = dosZipSources[fsIndexes_i];
-    strChars_dos = dosZipSources[strChars_i];
+//    strChars_dos = dosZipSources[strChars_i];
     control_dos = dosZipSources[control_i];
     strSeg_dos = dosZipSources[strSeg_i];
   } 
@@ -2988,4 +3241,11 @@ public class BinaryCasSerDes6 {
     }
 
   }
+  
+  private int[] toArrayOrINT0(IntVector v) {
+    if (null ==  v) {
+      return INT0;
+    }
+    return v.toArray();
+  }
 }

Modified: uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/CasSeqAddrMaps.java
URL: http://svn.apache.org/viewvc/uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/CasSeqAddrMaps.java?rev=1452857&r1=1452856&r2=1452857&view=diff
==============================================================================
--- uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/CasSeqAddrMaps.java (original)
+++ uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/main/java/org/apache/uima/cas/impl/CasSeqAddrMaps.java Tue Mar  5 15:44:14 2013
@@ -50,7 +50,7 @@ public class CasSeqAddrMaps {
    *      
    * First seq number is 0.
    */
-  final private IntVector tgtSeq2SrcAddr = new IntVector();
+  final private IntVector tgtSeq2SrcAddr;
   
 //  /**
 //   * (Not Used, currently)
@@ -69,7 +69,7 @@ public class CasSeqAddrMaps {
    * map from source address to target sequence number.
    * if source is not in target, value = -1;
    */
-  final private IntRedBlackTree srcAddr2TgtSeq = new IntRedBlackTree();
+  final private IntRedBlackTree srcAddr2TgtSeq;
   
   /**
    * info needed to do a map from target aux heap to source aux heap
@@ -96,8 +96,15 @@ public class CasSeqAddrMaps {
   public CasSeqAddrMaps() {
     // this call makes the first real seq number == 1.
     // seq 0 refers to the NULL fs value at heap location 0.
+    this.tgtSeq2SrcAddr = new IntVector();
+    this.srcAddr2TgtSeq = new IntRedBlackTree();
     addItemAddr(0, 0, true);
   }
+  
+  public CasSeqAddrMaps(IntVector tgtSeq2SrcAddr, IntRedBlackTree srcAddr2TgtSeq) {
+    this.tgtSeq2SrcAddr = tgtSeq2SrcAddr;
+    this.srcAddr2TgtSeq = srcAddr2TgtSeq;
+  }
         
   /**
    * Add a new FS address - done during prescan of source
@@ -169,4 +176,11 @@ public class CasSeqAddrMaps {
   public int getNumberSrcFss() {
     return srcAddr2TgtSeq.size();
   }
+
+  CasSeqAddrMaps copy() {
+    CasSeqAddrMaps c = new CasSeqAddrMaps(tgtSeq2SrcAddr.copy(), srcAddr2TgtSeq.copy());
+    c.nextTgt = nextTgt;
+    return c;    
+  }
+  
 }

Modified: uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest.java?rev=1452857&r1=1452856&r2=1452857&view=diff
==============================================================================
--- uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest.java (original)
+++ uima/uimaj/branches/filteredCompress-uima-2498/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest.java Tue Mar  5 15:44:14 2013
@@ -105,6 +105,7 @@ public class SerDesTest extends TestCase
   private CASImpl cas;
   private CASImpl deserCas;
   private CASImpl deltaCas;
+  private CASImpl remoteCas;
 
   private TypeSystemImpl ts;
   private List<FeatureStructure> lfs;
@@ -186,6 +187,7 @@ public class SerDesTest extends TestCase
       this.ts = (TypeSystemImpl) this.cas.getTypeSystem();
       deserCas = (CASImpl) CasCreationUtils.createCas(ts, null, null, null);
       deltaCas = (CASImpl) CasCreationUtils.createCas(ts, null, null, null);
+      remoteCas = (CASImpl) CasCreationUtils.createCas(ts, null, null, null);
       lfs = new ArrayList<FeatureStructure>();
     } catch (Exception e) {
       assertTrue(false);
@@ -197,8 +199,59 @@ public class SerDesTest extends TestCase
     this.ts = null;
     deserCas = null;
     deltaCas = null;
+    remoteCas = null;
     lfs = null;
   }
+
+  public void testDeltaWithStringArrayMod() throws IOException {
+    loadCas(cas);
+    ReuseInfo[] ri = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
+    lfs = getIndexedFSs(remoteCas);
+    FeatureStructure fs = lfs.get(10);
+    StringArrayFS sa = (StringArrayFS) fs.getFeatureValue(akofAstring);
+    sa.set(0, "change2");
+    verifyDelta(marker, ri);
+  }
+
+  public void testDeltaWithDblArrayMod() throws IOException {
+    loadCas(cas);
+    ReuseInfo[] ri = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
+    lfs = getIndexedFSs(remoteCas);
+    FeatureStructure fs = lfs.get(10);  /* has double array length 2 */
+    DoubleArrayFS d = (DoubleArrayFS) fs.getFeatureValue(akofAdouble);
+    d.set(0, 12.34D);
+    verifyDelta(marker, ri);
+  }
+  
+  public void testDeltaWithByteArrayMod() throws IOException {
+    loadCas(cas);
+    ReuseInfo[] ri = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
+    
+    lfs = getIndexedFSs(remoteCas); // gets FSs below the line
+
+    FeatureStructure fs = lfs.get(10);
+    ByteArrayFS sfs = (ByteArrayFS) fs.getFeatureValue(akofAbyte);
+    sfs.set(0, (byte)21);
+
+    verifyDelta(marker, ri);
+  }
+
+  public void testDeltaWithStrArrayMod() throws IOException {
+    loadCas(cas);
+    ReuseInfo[] ri = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
+    
+    lfs = getIndexedFSs(remoteCas);
+
+    FeatureStructure fs = lfs.get(10);
+    StringArrayFS sfs = (StringArrayFS) fs.getFeatureValue(akofAstring);
+    sfs.set(0, "change");
+
+    verifyDelta(marker, ri);
+  }
   
   /**
    * Make one of each kind of artifact, including arrays
@@ -206,60 +259,62 @@ public class SerDesTest extends TestCase
    */
   
   public void testAllKinds() {
-    loadCas(lfs);  
+    loadCas(cas);  
     verify();
   }
   
   /**
    * 1) create a base cas with some data
-   * 1a) make a copy of the cas to that point
-   *      Don't use CasCopier - it will reorder the fs's in the heap.
-   *      Instead, use plain binary serialization / deserialization
-   * 2) create the mark: cas.createMarker()
-   * 3) add more cas data
+   * 2) serialize it out and then back into a remoteCas
+   *    This is needed to get the proper ordering in the remote cas of the 
+   *    feature structures - they will be ordered by their fs addr, but omitting
+   *    "skipped" FSs (not in the index or reachable, and not in the target ts)
+   * 2) create the mark in the remoteCAS: cas.createMarker()
+   * 3) add more cas data to the remoteCas
    * 4) serialize with marker
-   * 5) using copy, deserialize with marker
-   * 6) check resulting cas = original in 4)
+   * 5) deserialize back into base cas
+   * 6) check resulting base cas = remote cas
    * 
    * 
    */
   public void testDelta() {
-    loadCas(lfs);
-    // don't use CasCopier because it doesn't preserve the order of fs's in the heap
-    binaryCopyCas(cas, deltaCas);
-    
-    MarkerImpl marker = (MarkerImpl) cas.createMarker();
-    loadCas(lfs);
-    verifyDelta(marker);
+    loadCas(cas);
+    ReuseInfo[] ri = serializeDeserialize(cas, remoteCas, null, null);
+    
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
+    loadCas(remoteCas);
+    verifyDelta(marker, ri);
   }
   
   public void testDeltaWithRefsBelow() {
     lfs.clear();
-    loadCas(lfs);
-    binaryCopyCas(cas, deltaCas);
-    MarkerImpl marker = (MarkerImpl) cas.createMarker();
+    loadCas(cas);
+    ReuseInfo ri[] = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
     
-    FeatureStructure fs = cas.createFS(akof);
+    lfs = getIndexedFSs(remoteCas);
+    FeatureStructure fs = remoteCas.createFS(akof);
     fs.setFeatureValue(akofFs, lfs.get(0));
-    ArrayFS fsafs = cas.createArrayFS(4);
+    ArrayFS fsafs = remoteCas.createArrayFS(4);
     fsafs.set(1, lfs.get(1));
     fsafs.set(2, lfs.get(2));
     fsafs.set(3, lfs.get(3));
     fs.setFeatureValue(akofAfs, fsafs);
     
-    verifyDelta(marker);
+    verifyDelta(marker, ri);
   }
 
   public void testDeltaWithMods() {
     lfs.clear();
-    loadCas(lfs);
-    binaryCopyCas(cas, deltaCas);
-    MarkerImpl marker = (MarkerImpl) cas.createMarker();
+    loadCas(cas);
+    ReuseInfo ri[] = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
     
-    FeatureStructure fs = cas.createFS(akof);
+    lfs = getIndexedFSs(remoteCas);
+    FeatureStructure fs = remoteCas.createFS(akof);
     lfs.get(0).setFeatureValue(akofFs, fs);
     
-    verifyDelta(marker);
+    verifyDelta(marker, ri);
   }
   
   /**
@@ -288,13 +343,14 @@ public class SerDesTest extends TestCase
   }
 
   public void checkDeltaWithAllMods(Random r) {
-    makeRandomFss(7, lfs, r);
-    loadCas(lfs);
-    binaryCopyCas(cas, deltaCas);
-    MarkerImpl marker = (MarkerImpl) cas.createMarker();
-    int belowMarkSize = lfs.size();
+    makeRandomFss(cas, 7, r);
+    loadCas(cas);
+    ReuseInfo ri[] = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
+    
+    lfs = getIndexedFSs(remoteCas);
     
-    makeRandomFss(8, lfs, r);
+    makeRandomFss(remoteCas, 8, r);
 
     int i = 0;
     for (FeatureStructure fs : lfs) {
@@ -303,28 +359,33 @@ public class SerDesTest extends TestCase
       }
     }
     
-    makeRandomUpdatesBelowMark(lfs, belowMarkSize, r);
+    makeRandomUpdatesBelowMark(remoteCas, r);
     
-    verifyDelta(marker);
+    verifyDelta(marker, ri);
 
   }
+  
 
-  public void testDeltaWithIndexMods() {
-    loadCas(lfs);
-    binaryCopyCas(cas, deltaCas);
-    MarkerImpl marker = (MarkerImpl) cas.createMarker();
-    List<FeatureStructure> lfs2 = new ArrayList<FeatureStructure>();
-    loadCas(lfs2);
+  
+  public void testDeltaWithIndexMods() throws IOException {
+    loadCas(cas);
+    ReuseInfo ri[] = serializeDeserialize(cas, remoteCas, null, null);
+    MarkerImpl marker = (MarkerImpl) remoteCas.createMarker();
     
-    cas.getIndexRepository().removeFS(lfs.get(0));
-    cas.getIndexRepository().removeFS(lfs.get(1));
-    cas.getIndexRepository().addFS(lfs.get(1));  // should appear as reindexed
+    lfs = new ArrayList<FeatureStructure>();
+    loadCas(remoteCas);
+    
+    List<FeatureStructure> lfs2 = getIndexedFSs(remoteCas);
+
+    remoteCas.getIndexRepository().removeFS(lfs2.get(0));
+    remoteCas.getIndexRepository().removeFS(lfs2.get(1));
+    remoteCas.getIndexRepository().addFS(lfs2.get(1));  // should appear as reindexed
 
-    cas.getIndexRepository().removeFS(lfs2.get(0));
-    cas.getIndexRepository().removeFS(lfs2.get(1));
-    cas.getIndexRepository().addFS(lfs2.get(1)); 
+    remoteCas.getIndexRepository().removeFS(lfs.get(0));
+    remoteCas.getIndexRepository().removeFS(lfs.get(1));
+    remoteCas.getIndexRepository().addFS(lfs.get(1)); 
 
-    verifyDelta(marker);
+    verifyDelta(marker, ri);
   }
   
   public void testWithOtherSerializer() {
@@ -333,7 +394,7 @@ public class SerDesTest extends TestCase
     tearDown(); setUp();
     testDeltaWithRefsBelow();
     tearDown(); setUp();
-    testDeltaWithAllMods();
+//    testDeltaWithAllMods();
     tearDown(); setUp();
     testAllKinds();
     tearDown(); setUp();
@@ -347,13 +408,13 @@ public class SerDesTest extends TestCase
      * Make equal items,
      * ser/deser, update one of the equal items, insure other not updated
      */
-    FeatureStructure fsAt1 = newAkof(fsl);
-    FeatureStructure fsAt2 = newAkof(fsl);
+    FeatureStructure fsAt1 = newAkof(cas, fsl);
+    FeatureStructure fsAt2 = newAkof(cas, fsl);
     cas.addFsToIndexes(fsAt1);
     cas.addFsToIndexes(fsAt2);
 
-    createStringA(fsAt1, "at");
-    createStringA(fsAt2, "at");
+    createStringA(cas, fsAt1, "at");
+    createStringA(cas, fsAt2, "at");
     verify();
     
     FSIterator<FeatureStructure> it = deserCas.indexRepository.getAllIndexedFS(akof);
@@ -366,13 +427,13 @@ public class SerDesTest extends TestCase
     assertEquals(sa1.get(1), "def");
     cas.reset();
     
-    fsAt1 = newAkof(fsl);
-    fsAt2 = newAkof(fsl);
+    fsAt1 = newAkof(cas, fsl);
+    fsAt2 = newAkof(cas, fsl);
     cas.addFsToIndexes(fsAt1);
     cas.addFsToIndexes(fsAt2);
 
-    createLongA(fsAt1, 9);
-    createLongA(fsAt2, 9);
+    createLongA(cas, fsAt1, 9);
+    createLongA(cas, fsAt2, 9);
     verify();
     
     it = deserCas.indexRepository.getAllIndexedFS(akof);
@@ -387,11 +448,24 @@ public class SerDesTest extends TestCase
   
   
   
-  /*******************************
-   * Helper functions
-   *******************************/
+//  /*******************************
+//   * Helper functions
+//   * @throws IOException 
+//   *******************************/
+//  private ReuseInfo getReuseInfo() {
+//    BinaryCasSerDes6 bcs = new BinaryCasSerDes6(cas); 
+//    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+//    try {
+//      bcs.serialize(baos);
+//    } catch (IOException e) {
+//      // TODO Auto-generated catch block
+//      e.printStackTrace();
+//    }
+//    return bcs.getReuseInfo();
+//  }
+  
   
-  private void createStringA(FeatureStructure fs, String x) {
+  private void createStringA(CASImpl cas, FeatureStructure fs, String x) {
     StringArrayFS strafs = cas.createStringArrayFS(5);
     strafs.set(3, null);
     strafs.set(2, "" + x);
@@ -401,7 +475,7 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofAstring, strafs);
   }
   
-  private void createIntA (FeatureStructure fs, int x) {
+  private void createIntA (CASImpl cas, FeatureStructure fs, int x) {
     IntArrayFS iafs = cas.createIntArrayFS(4 + x);
     iafs.set(0, Integer.MAX_VALUE - x);
     iafs.set(1, Integer.MIN_VALUE + x);
@@ -409,7 +483,7 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofAint, iafs);
   }
   
-  private void createFloatA (FeatureStructure fs, float x) {
+  private void createFloatA (CASImpl cas, FeatureStructure fs, float x) {
     FloatArrayFS fafs = cas.createFloatArrayFS(6);
     fafs.set(0, Float.MAX_VALUE - x);
 //    fafs.set(1, Float.MIN_NORMAL + x);
@@ -420,7 +494,7 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofAfloat, fafs);
   }
 
-  private void createDoubleA (FeatureStructure fs, double x) {
+  private void createDoubleA (CASImpl cas, FeatureStructure fs, double x) {
     DoubleArrayFS fafs = cas.createDoubleArrayFS(6);
     fafs.set(0, Double.MAX_VALUE - x);
 //    fafs.set(1, Double.MIN_NORMAL + x);
@@ -431,7 +505,7 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofAdouble, fafs);
   }
 
-  private void createLongA (FeatureStructure fs, long x) {
+  private void createLongA (CASImpl cas, FeatureStructure fs, long x) {
     LongArrayFS lafs = cas.createLongArrayFS(4);
     lafs.set(0, Long.MAX_VALUE - x);
     lafs.set(1, Long.MIN_VALUE + x);
@@ -439,21 +513,21 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofAlong, lafs);
   }
   
-  private void binaryCopyCas(CASImpl c1, CASImpl c2) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    Serialization.serializeCAS(cas, baos);
-    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-    c2.reinit(bais);
-  }
+//  private void binaryCopyCas(CASImpl c1, CASImpl c2) {
+//    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+//    Serialization.serializeCAS(cas, baos);
+//    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+//    c2.reinit(bais);
+//  }
   
-  private FeatureStructure newAkof(List<FeatureStructure> fsl) {
+  private FeatureStructure newAkof(CASImpl cas, List<FeatureStructure> fsl) {
     FeatureStructure fs = cas.createFS(akof);
     fsl.add(fs);
     return fs;
   }
   
   // make an instance of akof with all features set
-  private FeatureStructure makeAkof(Random r) {
+  private FeatureStructure makeAkof(CASImpl cas, Random r) {
     FeatureStructure fs = cas.createFS(akof);
     fs.setBooleanValue(akofBoolean, r.nextBoolean());
     fs.setByteValue(akofByte, (byte)r.nextInt());
@@ -465,15 +539,15 @@ public class SerDesTest extends TestCase
     fs.setStringValue(akofString, randomString(r));
     fs.setFeatureValue(akofFs, fs);
     
-    fs.setFeatureValue(akofAint, randomIntA(r));
+    fs.setFeatureValue(akofAint, randomIntA(cas, r));
     fs.setFeatureValue(akofAfs, cas.createArrayFS(1));
-    fs.setFeatureValue(akofAfloat, randomFloatA(r));
-    fs.setFeatureValue(akofAdouble, randomDoubleA(r));
-    fs.setFeatureValue(akofAlong, randomLongA(r));
-    fs.setFeatureValue(akofAshort, randomShortA(r));
-    fs.setFeatureValue(akofAbyte, randomByteA(r));
+    fs.setFeatureValue(akofAfloat, randomFloatA(cas, r));
+    fs.setFeatureValue(akofAdouble, randomDoubleA(cas, r));
+    fs.setFeatureValue(akofAlong, randomLongA(cas, r));
+    fs.setFeatureValue(akofAshort, randomShortA(cas, r));
+    fs.setFeatureValue(akofAbyte, randomByteA(cas, r));
     fs.setFeatureValue(akofAboolean, cas.createBooleanArrayFS(2));
-    fs.setFeatureValue(akofAstring, randomStringA(r));
+    fs.setFeatureValue(akofAstring, randomStringA(cas, r));
 
     return fs;    
   }
@@ -485,7 +559,7 @@ public class SerDesTest extends TestCase
     return stringValues[r.nextInt(7)];
   }
 
-  private StringArrayFS randomStringA(Random r) {
+  private StringArrayFS randomStringA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     StringArrayFS fs = cas.createStringArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -495,7 +569,7 @@ public class SerDesTest extends TestCase
   }
 
   
-  private IntArrayFS randomIntA(Random r) {
+  private IntArrayFS randomIntA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     IntArrayFS fs = cas.createIntArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -507,7 +581,7 @@ public class SerDesTest extends TestCase
   private static final byte[] byteValues = {
     1, 0, -1, Byte.MAX_VALUE, Byte.MIN_VALUE, 9, -9  };
   
-  private ByteArrayFS randomByteA(Random r) {
+  private ByteArrayFS randomByteA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     ByteArrayFS fs = cas.createByteArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -519,7 +593,7 @@ public class SerDesTest extends TestCase
   private static final long[] longValues = {
     1L, 0L, -1L, Long.MAX_VALUE, Long.MIN_VALUE, 11L, -11L  };
   
-  private LongArrayFS randomLongA(Random r) {
+  private LongArrayFS randomLongA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     LongArrayFS fs = cas.createLongArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -531,7 +605,7 @@ public class SerDesTest extends TestCase
   private static final short[] shortValues = {
     1, 0, -1, Short.MAX_VALUE, Short.MIN_VALUE, 22, -22  };
   
-  private ShortArrayFS randomShortA(Random r) {
+  private ShortArrayFS randomShortA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     ShortArrayFS fs = cas.createShortArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -543,7 +617,7 @@ public class SerDesTest extends TestCase
   private static final double[] doubleValues = {
     1d, 0d, -1d, Double.MAX_VALUE, /*Double.MIN_NORMAL,*/ Double.MIN_VALUE, 33d, -33.33d  };
   
-  private DoubleArrayFS randomDoubleA(Random r) {
+  private DoubleArrayFS randomDoubleA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     DoubleArrayFS fs = cas.createDoubleArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -555,7 +629,7 @@ public class SerDesTest extends TestCase
   private static final float[] floatValues = {
     1f, 0f, -1f, Float.MAX_VALUE, /*Float.MIN_NORMAL,*/ Float.MIN_VALUE, 17f, -22.33f  };
   
-  private FloatArrayFS randomFloatA(Random r) {
+  private FloatArrayFS randomFloatA(CASImpl cas, Random r) {
     int length = r.nextInt(2) + 1;
     FloatArrayFS fs = cas.createFloatArrayFS(length);
     for (int i = 0; i < length; i++) {
@@ -564,20 +638,24 @@ public class SerDesTest extends TestCase
     return fs;
   }
   
-  private void makeRandomFss(int n, List<FeatureStructure> fss, Random r) {
+  private void makeRandomFss(CASImpl cas, int n, Random r) {
     List<FeatureStructure> lfss = new ArrayList<FeatureStructure>();
     for (int i = 0; i < n; i++) {
-      FeatureStructure fs = makeAkof(r);
-      lfss.add(fs);
-      fss.add(fs);
+      FeatureStructure fs = makeAkof(cas, r);
+      if (r.nextBoolean()) {
+        cas.addFsToIndexes(fs);
+        lfss.add(fs);
+        lfs.add(fs);
+      }
     }
     for (FeatureStructure fs : lfss) {
       fs.setFeatureValue(akofFs, lfss.get(r.nextInt(lfss.size())));
     }
   }
   
-  private void loadCas(List<FeatureStructure> fsl) {
-    FeatureStructure fs = newAkof(fsl);
+  private void loadCas(CASImpl cas) {
+    /* lfs index: 0 */
+    FeatureStructure fs = newAkof(cas, lfs);
     fs.setBooleanValue(akofBoolean, true);
     fs.setByteValue(akofByte, (byte)109);
     fs.setShortValue(akofShort, (short) 23);
@@ -591,7 +669,8 @@ public class SerDesTest extends TestCase
     FeatureStructure fs1 = fs;
     
     //extreme or unusual values
-    fs = newAkof(fsl);
+    /* lfs index: 1 */
+    fs = newAkof(cas, lfs);
     fs.setBooleanValue(akofBoolean, false);
     fs.setByteValue(akofByte, Byte.MAX_VALUE);
     fs.setShortValue(akofShort, (short) Short.MAX_VALUE);
@@ -603,7 +682,8 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofFs, fs1);
     cas.addFsToIndexes(fs);
 
-    fs = newAkof(fsl);
+    /* lfs index: 2 */
+    fs = newAkof(cas, lfs);
     fs.setByteValue(akofByte, Byte.MIN_VALUE);
     fs.setShortValue(akofShort, (short) Short.MIN_VALUE);
     fs.setIntValue(akofInt, Integer.MIN_VALUE);
@@ -615,7 +695,8 @@ public class SerDesTest extends TestCase
     cas.addFsToIndexes(fs);
     FeatureStructure fs3 = fs;
 
-    fs = newAkof(fsl);
+    /* lfs index: 3 */
+    fs = newAkof(cas, lfs);
     fs.setByteValue(akofByte, (byte)0);
     fs.setShortValue(akofShort, (short) 0);
     fs.setIntValue(akofInt, 0);
@@ -627,7 +708,8 @@ public class SerDesTest extends TestCase
     fs3.setFeatureValue(akofFs, fs);  // make a forward ref
     FeatureStructure fs4 = fs;
 
-    fs = newAkof(fsl);
+    /* lfs index: 4 */
+    fs = newAkof(cas, lfs);
     fs.setByteValue(akofByte, (byte)1);
     fs.setShortValue(akofShort, (short)1);
     fs.setIntValue(akofInt, 1);
@@ -636,34 +718,39 @@ public class SerDesTest extends TestCase
     fs.setDoubleValue(akofDouble, 1.0D);
     cas.addFsToIndexes(fs);
     
-//    fs = newAkof(fsl);
+//    fs = newAkof(cas, lfs);
 //    fs.setFloatValue(akofFloat, Float.MIN_NORMAL);
 //    fs.setDoubleValue(akofDouble, Double.MIN_NORMAL);
 //    cas.addFsToIndexes(fs);
     
-    fs = newAkof(fsl);
+    /* lfs index: 5 */
+    fs = newAkof(cas, lfs);
     fs.setFloatValue(akofFloat, Float.MIN_VALUE);
     fs.setDoubleValue(akofDouble, Double.MIN_VALUE);
     cas.addFsToIndexes(fs);
 
-    fs = newAkof(fsl);
+    /* lfs index: 6 */
+    fs = newAkof(cas, lfs);
     fs.setFloatValue(akofFloat, Float.NaN);
     fs.setDoubleValue(akofDouble, Double.NaN);
     cas.addFsToIndexes(fs);
 
-    fs = newAkof(fsl);
+    /* lfs index: 7 */
+    fs = newAkof(cas, lfs);
     fs.setFloatValue(akofFloat, Float.POSITIVE_INFINITY);
     fs.setDoubleValue(akofDouble, Double.POSITIVE_INFINITY);
     cas.addFsToIndexes(fs);
 
-    fs = newAkof(fsl);
+    /* lfs index: 8 */
+    fs = newAkof(cas, lfs);
     fs.setFloatValue(akofFloat, Float.NEGATIVE_INFINITY);
     fs.setDoubleValue(akofDouble, Double.NEGATIVE_INFINITY);
     cas.addFsToIndexes(fs);
 
     
     // test arrays
-    fs = newAkof(fsl);
+    /* lfs index: 9 */
+    fs = newAkof(cas, lfs);
     fs.setFeatureValue(akofAint, cas.createIntArrayFS(0));
     fs.setFeatureValue(akofAfs, cas.createArrayFS(0));
     fs.setFeatureValue(akofAfloat, cas.createFloatArrayFS(0));
@@ -676,7 +763,8 @@ public class SerDesTest extends TestCase
     cas.addFsToIndexes(fs);
     FeatureStructure fs8 = fs;
 
-    fs = newAkof(fsl);
+    /* lfs index: 10 */
+    fs = newAkof(cas, lfs);
     fs.setFeatureValue(akofAint, cas.createIntArrayFS(2));
     fs.setFeatureValue(akofAfs, cas.createArrayFS(2));
     fs.setFeatureValue(akofAfloat, cas.createFloatArrayFS(2));
@@ -688,21 +776,23 @@ public class SerDesTest extends TestCase
     fs.setFeatureValue(akofAstring, cas.createStringArrayFS(2));
     cas.addFsToIndexes(fs);
     
-    fs = newAkof(fsl);
+    /* lfs index: 11 */
+    fs = newAkof(cas, lfs);
     cas.addFsToIndexes(fs);
     
-    createIntA(fs,0);
+    createIntA(cas, fs, 0);
     
     // feature structure array
+    /* lfs index: 12 */
     ArrayFS fsafs = cas.createArrayFS(4);
     fsafs.set(1, fs8);
     fsafs.set(2, fs1);
     fsafs.set(3, fs4);
     fs.setFeatureValue(akofAfs, fsafs);
     
-    createFloatA(fs, 0f);
-    createDoubleA(fs, 0d);
-    createLongA(fs, 0L);
+    createFloatA(cas, fs, 0f);
+    createDoubleA(cas, fs, 0d);
+    createLongA(cas, fs, 0L);
     
     ShortArrayFS safs = cas.createShortArrayFS(4);
     safs.set(0, Short.MAX_VALUE);
@@ -721,8 +811,8 @@ public class SerDesTest extends TestCase
     booafs.set(1, false);
     fs.setFeatureValue(akofAboolean, booafs);
     
-    createStringA(fs, "");
-    makeRandomFss(15, fsl, new Random());
+    createStringA(cas, fs, "");
+    makeRandomFss(cas, 15, new Random());
   }
 
   private void verify() {
@@ -745,21 +835,59 @@ public class SerDesTest extends TestCase
     }    
   }
 
-  private void verifyDelta(MarkerImpl mark) {
+  private ReuseInfo[] serializeDeserialize(
+      CASImpl casSrc, 
+      CASImpl casTgt, 
+      ReuseInfo ri, 
+      MarkerImpl mark) {
+    ReuseInfo[] riToReturn = new ReuseInfo[2];
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+      if (doPlain) {
+        if (null == mark) {
+          Serialization.serializeCAS(cas, baos);
+        } else {
+          Serialization.serializeCAS(casSrc, baos, mark);
+        }
+      } else {
+        BinaryCasSerDes6 bcs = new BinaryCasSerDes6(casSrc, mark);
+        SerializationMeasures sm = bcs.serialize(baos);
+        if (sm != null) {System.out.println(sm);}
+        riToReturn[0] = bcs.getReuseInfo();
+      }
+      ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+      if (doPlain) {
+        casTgt.reinit(bais);
+      } else {
+        BinaryCasSerDes6 bcsDeserRmt = new BinaryCasSerDes6(casTgt);
+        bcsDeserRmt.deserialize(bais);
+        riToReturn[1] = bcsDeserRmt.getReuseInfo();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return riToReturn;
+  }
+  
+  private void verifyDelta(MarkerImpl mark, ReuseInfo[] ri) {
     try {
-      ReuseInfo ri = null;
       ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
       if (doPlain) {
-        Serialization.serializeCAS(cas, baos);
+        Serialization.serializeCAS(remoteCas, baos, mark);
       } else {
-        BinaryCasSerDes6 bcs = new BinaryCasSerDes6(cas, mark);
+        BinaryCasSerDes6 bcs = new BinaryCasSerDes6(remoteCas, mark, null, ri[1]);
         SerializationMeasures sm = bcs.serialize(baos);
         if (sm != null) {System.out.println(sm);}
-        ri = bcs.getReuseInfo();
       }
       ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-      deltaCas.reinit(bais, ri);
-      assertTrue(BinaryCasSerDes6.compareCASes(cas, deltaCas));
+      if (doPlain) {
+        cas.reinit(bais);
+      } else {
+          BinaryCasSerDes6 bcsDeserialize = new BinaryCasSerDes6(cas, null, null, ri[0]);
+//        cas.reinit(bais, ri);
+          bcsDeserialize.deserialize(bais);
+      }
+      assertTrue(BinaryCasSerDes6.compareCASes(cas, remoteCas));
       
       // verify indexed fs same, and in same order - already done by compareCASes
 //      int[] fsIndexes1 = cas.getIndexedFSs();
@@ -770,13 +898,13 @@ public class SerDesTest extends TestCase
     }
   }
   
-  private void makeRandomUpdatesBelowMark(List<FeatureStructure> fs, int belowMarkSize, Random r) {
-    for (int i = 0; i < belowMarkSize; i++ ) {
-      makeRandomUpdate(lfs.get(i), r);
+  private void makeRandomUpdatesBelowMark(CASImpl cas, Random r) {
+    for (FeatureStructure fs : lfs) {
+      makeRandomUpdate(cas, fs, r);
     }
   }
 
-  private void makeRandomUpdate(FeatureStructure fs, Random r) {
+  private void makeRandomUpdate(CASImpl cas, FeatureStructure fs, Random r) {
     int n = r.nextInt(3);
     for (int i = 0 ; i < n; i++) {
       switch (r.nextInt(26)) {
@@ -808,31 +936,31 @@ public class SerDesTest extends TestCase
         fs.setFeatureValue(akofFs, fs);
         break;
       case 9:
-        fs.setFeatureValue(akofAint, randomIntA(r));
+        fs.setFeatureValue(akofAint, randomIntA(cas, r));
         break;
       case 10:
         fs.setFeatureValue(akofAfs, cas.createArrayFS(1));
         break;
       case 11:
-        fs.setFeatureValue(akofAfloat, randomFloatA(r));
+        fs.setFeatureValue(akofAfloat, randomFloatA(cas, r));
         break;
       case 12:
-        fs.setFeatureValue(akofAdouble, randomDoubleA(r));
+        fs.setFeatureValue(akofAdouble, randomDoubleA(cas, r));
         break;
       case 13:
-        fs.setFeatureValue(akofAlong, randomLongA(r));
+        fs.setFeatureValue(akofAlong, randomLongA(cas, r));
         break;
       case 14:
-        fs.setFeatureValue(akofAshort, randomShortA(r));
+        fs.setFeatureValue(akofAshort, randomShortA(cas, r));
         break;
       case 15:
-        fs.setFeatureValue(akofAbyte, randomByteA(r));
+        fs.setFeatureValue(akofAbyte, randomByteA(cas, r));
         break;
       case 16:
         fs.setFeatureValue(akofAboolean, cas.createBooleanArrayFS(2));
         break;
       case 17: 
-        fs.setFeatureValue(akofAstring, randomStringA(r));
+        fs.setFeatureValue(akofAstring, randomStringA(cas, r));
         break;
       case 18: {
           IntArrayFS sfs = (IntArrayFS) fs.getFeatureValue(akofAint);
@@ -894,4 +1022,12 @@ public class SerDesTest extends TestCase
     }
   }
 
+  private List<FeatureStructure> getIndexedFSs(CASImpl cas) {
+    FSIterator<FeatureStructure> it = cas.getIndexRepository().getAllIndexedFS(akof);
+    List<FeatureStructure> lfs = new ArrayList<FeatureStructure>();
+    while (it.hasNext()) {
+      lfs.add(it.next());
+    }
+    return lfs;
+  }
 }