You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2013/04/10 17:07:13 UTC

svn commit: r1466505 - in /uima/uimaj/trunk/uimaj-core/src: main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java main/java/org/apache/uima/cas/impl/CASImpl.java test/java/org/apache/uima/cas/impl/SerDesTest6.java

Author: schor
Date: Wed Apr 10 15:07:12 2013
New Revision: 1466505

URL: http://svn.apache.org/r1466505
Log:
[UIMA-2498 UIMA-2778] add test case for type filtering while deserializing and fix many bugs it exposed.  Add support for parallel step

Modified:
    uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
    uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
    uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java

Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java?rev=1466505&r1=1466504&r2=1466505&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java Wed Apr 10 15:07:12 2013
@@ -198,6 +198,11 @@ import org.apache.uima.util.impl.Seriali
 public class BinaryCasSerDes6 {
 
   private static final int[] INT0 = new int[0];
+  
+  private static final boolean TRACE_SER = false;
+  private static final boolean TRACE_DES = false;
+  
+  private static final boolean TRACE_STR_ARRAY = false;
   /**
    * Version of the serializer/deserializer, used to allow deserialization of 
    * older versions
@@ -404,6 +409,7 @@ public class BinaryCasSerDes6 {
    * Things for just deserialization  
    **********************************/
 
+  private AllowPreexistingFS allowPreexistingFS;
   private DataInputStream deserIn;
   private int version;
 
@@ -457,6 +463,7 @@ public class BinaryCasSerDes6 {
    *            For normal serialization - can be null, but if not, is used in place of re-calculating, for speed up
    *            For delta deserialization - must not be null, and is the saved value after serializing to the service
    *            For normal deserialization - must be null
+   * @param allowPreexistingFSs - if false, throws error if deserializing delta cas has modifications below the mark, for parallel remotes
    * @param doMeasurements if true, measurements are done (on serialization)
    * @param compressLevel if not null, specifies enum instance for compress level
    * @param compressStrategy if not null, specifies enum instance for compress strategy
@@ -466,7 +473,7 @@ public class BinaryCasSerDes6 {
       MarkerImpl mark,
       TypeSystemImpl tgtTs,
       ReuseInfo rfs,
-      boolean doMeasurements, 
+      boolean doMeasurements,
       CompressLevel compressLevel, 
       CompressStrat compressStrategy) {
     cas = ((CASImpl) ((aCas instanceof JCas) ? ((JCas)aCas).getCas(): aCas)).getBaseCAS();
@@ -482,7 +489,6 @@ public class BinaryCasSerDes6 {
     this.sm = doMeasurements ? new SerializationMeasures() : null;
     
     isDelta = isSerializingDelta = (mark != null);
-    doMeasurements = (sm != null);
     typeMapperCmn = typeMapper = ts.getTypeSystemMapper(tgtTs);
     isTypeMappingCmn = isTypeMapping = (null != typeMapper);
     
@@ -673,6 +679,9 @@ public class BinaryCasSerDes6 {
       }
       final int tCode = heap[iHeap];  // get type code
       final int mappedTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
+      if (TRACE_SER) {
+        System.out.format("Ser: adr: %,d tCode: %s %,d tgtTypeCode: %,d %n", iHeap, tCode, ts.getTypeInfo(tCode), mappedTypeCode);
+      }
       if (mappedTypeCode == 0) { // means no corresponding type in target system
         continue;
       }
@@ -760,6 +769,9 @@ public class BinaryCasSerDes6 {
       break;
     case Slot_StrRef:
       for (int i = iHeap + 2; i < endi; i++) {
+        if (TRACE_STR_ARRAY) {
+          System.out.format("Trace Str Array Ser: addr: %,d string=%s%n", i, stringHeapObj.getStringForCode(heap[i]));
+        }
         writeString(stringHeapObj.getStringForCode(heap[i]));
       }
       break;
@@ -1541,10 +1553,7 @@ public class BinaryCasSerDes6 {
    *************************************************************************************/   
 
   public void deserialize(InputStream istream) throws IOException {
-    deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream
-        : new DataInputStream(istream);
-
-    readHeader();
+    readHeader(istream);
 
     if (isReadingDelta) {
       if (!reuseInfoProvided) {
@@ -1554,10 +1563,36 @@ public class BinaryCasSerDes6 {
       cas.resetNoQuestions();
     }
 
-    deserializeAfterVersion(deserIn, isReadingDelta);
+    deserializeAfterVersion(deserIn, isReadingDelta, AllowPreexistingFS.allow);
   }
   
-  public void deserializeAfterVersion(DataInputStream istream, boolean isDelta) throws IOException {
+  /**
+   * Version used by uima-as to read delta cas from remote parallel steps
+   * @param istream
+   * @param allowPreexistingFSs
+   * @throws IOException
+   */
+  public void deserialize(InputStream istream, AllowPreexistingFS allowPreexistingFS) throws IOException {
+    readHeader(istream);
+
+    if (isReadingDelta) {
+      if (!reuseInfoProvided) {
+        throw new UnsupportedOperationException("Deserializing Delta Cas, but original not serialized from");
+      }
+    } else {
+      throw new UnsupportedOperationException("Delta CAS required for this call");
+    }
+
+    deserializeAfterVersion(deserIn, isReadingDelta, allowPreexistingFS);
+  }
+  
+  
+  public void deserializeAfterVersion(DataInputStream istream, boolean isDelta, AllowPreexistingFS allowPreexistingFS) throws IOException {
+
+    this.allowPreexistingFS = allowPreexistingFS;
+    if (allowPreexistingFS == AllowPreexistingFS.ignore) {
+      throw new UnsupportedOperationException("AllowPreexistingFS.ignore not an allowed setting");
+    }
 
     deserIn = istream;
     this.isDelta = isReadingDelta = isDelta;
@@ -1604,7 +1639,6 @@ public class BinaryCasSerDes6 {
     //     Above the merge line: only the 2nd is possible
 
     if (isReadingDelta) {
-      // scan current source being added to / merged into
       if (!reuseInfoProvided) {
         throw new IllegalStateException("Reading Delta into CAS not serialized from");
       }
@@ -1621,6 +1655,7 @@ public class BinaryCasSerDes6 {
       }
       final int tgtTypeCode = readVnumber(typeCode_dis); // get type code
       final int srcTypeCode = isTypeMapping ? typeMapper.mapTypeCodeTgt2Src(tgtTypeCode) : tgtTypeCode;
+      
       final boolean storeIt = (srcTypeCode != 0);
       // A receiving client from a service always
       // has a superset of the service's types due to type merging so this
@@ -1629,8 +1664,8 @@ public class BinaryCasSerDes6 {
       // deleted a type.
     
       // The strategy for deserializing heap refs depends on finding
-      // the prev value for that type.  This can be skipped for 
-      // types being skipped because they don't exist in the source
+      // the prev value for that type.  This must be done in the context 
+      // of the sending CAS's type system
     
       // typeInfo is Target Type Info
       final TypeInfo tgtTypeInfo = isTypeMapping ? tgtTs.getTypeInfo(tgtTypeCode) :
@@ -1643,7 +1678,14 @@ public class BinaryCasSerDes6 {
         typeInfo = tgtTypeInfo;
         initPrevIntValue(iHeap);  // note "typeInfo" a hidden parameter - ugly...
       }
-      typeInfo = srcTypeInfo;
+      if (TRACE_DES) {
+        System.out.format("Des: addr %,d tgtTypeCode: %,d %s srcTypeCode: %,d%n", iHeap, tgtTypeCode, tgtTypeInfo,  srcTypeCode);
+      }
+
+//      if (srcTypeInfo == null) {
+//        typeInfo = null;  // debugging
+//      }
+      typeInfo = storeIt ? srcTypeInfo : tgtTypeInfo; // if !storeIt, then srcTypeInfo is null.
       
       fsStartIndexes.addSrcAddrForTgt(iHeap, storeIt);
       if (storeIt) {
@@ -1667,7 +1709,7 @@ public class BinaryCasSerDes6 {
           final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(srcTypeCode);
           for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
             final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;
-            SlotKind kind = typeInfo.getSlotKind(featOffsetInSrc);
+            SlotKind kind = tgtTypeInfo.slotKinds[i];  // target kind , may not exist in src
             readByKind(iHeap, featOffsetInSrc, kind, storeIt);
           }
         } else {
@@ -1767,7 +1809,7 @@ public class BinaryCasSerDes6 {
         for (int i = startIheap; i < endi; i++) {
           final int v = readDiff(arrayElementKind, prev);
           prev = v;
-          if (startIheap == i && isUpdatePrevOK) {
+          if (startIheap == i && isUpdatePrevOK && storeIt) {
             updatePrevIntValue(iHeap, 2, v);
           }
           if (storeIt) {
@@ -1790,7 +1832,10 @@ public class BinaryCasSerDes6 {
       break;
     case Slot_StrRef:
       for (int i = iHeap + 2; i < endi; i++) {
-        final int strRef = readString(storeIt); 
+        final int strRef = readString(storeIt);
+        if (TRACE_STR_ARRAY) {
+          System.out.format("Trace String Array Des addr: %,d storeIt=%s, string=%s%n", i, storeIt ? "Y" : "N", stringHeapObj.getStringForCode(strRef));
+        }
         if (storeIt) {
           heap[i] = strRef; 
         }
@@ -1809,7 +1854,7 @@ public class BinaryCasSerDes6 {
    */
   private void readByKind(int iHeap, int offset, SlotKind kind, boolean storeIt) throws IOException {
     
-    if (offset == -1) {
+    if (offset == 0) {
       storeIt = false;
     }
     switch (kind) {
@@ -1842,7 +1887,7 @@ public class BinaryCasSerDes6 {
       }
       break;
     case Slot_LongRef: {
-      long v = readLongOrDouble(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+      long v = readLongOrDouble(kind, (!storeIt || (iPrevHeap == 0)) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
       if (v == 0L) {
         if (longZeroIndex == -1) {
           longZeroIndex = longHeapObj.addLong(0L);
@@ -1954,7 +1999,7 @@ public class BinaryCasSerDes6 {
       byte_dis.readFully(byteHeapObj.heap, startPos, length);
       return startPos;
     } else {
-      byte_dis.skipBytes(length);
+      skipBytes(byte_dis, length);
       return 0;
     }
   }
@@ -1970,7 +2015,7 @@ public class BinaryCasSerDes6 {
       }
       return startPos;
     } else {
-      short_dis.skipBytes(length * 2);
+      skipBytes(short_dis, length * 2);
       return 0;
     }
   }
@@ -2009,7 +2054,7 @@ public class BinaryCasSerDes6 {
     } else {
       v = readDiff(kind, 0);
     }
-    if (isUpdatePrevOK) {
+    if (storeIt && isUpdatePrevOK) {
       updatePrevIntValue(iHeap, offset, v);
     }
   }
@@ -2057,8 +2102,8 @@ public class BinaryCasSerDes6 {
   
   private void skipLong(final int length) throws IOException {
     for (int i = 0; i < length; i++) {
-      long_High_dis.skipBytes(8);
-      long_Low_dis.skipBytes(8);
+      skipBytes(long_High_dis, 8);
+      skipBytes(long_Low_dis, 8);
     }
   }
   
@@ -2126,15 +2171,19 @@ public class BinaryCasSerDes6 {
       return 0;
     }
     if (1 == length) {
-      if (storeIt) {
+      // always store, in case later offset ref
+//      if (storeIt) {
         return stringHeapObj.addString("");
-      } else {
-        return 0;
-      }
+//      } else {
+//        return 0;
+//      }
     }
     
     if (length < 0) {  // in this case, -length is the slot index
       if (storeIt) {
+        if (TRACE_STR_ARRAY) {
+          System.out.format("Trace String Array Des ref to offset %,d%n", length);
+        }
         return stringTableOffset - length;
       } else {
         return 0;
@@ -2146,11 +2195,20 @@ public class BinaryCasSerDes6 {
     if (debugEOF) {
       System.out.format("readString offset = %,d%n", offset);
     }
-    if (storeIt) {
+    // need to store all strings, because an otherwise skipped one may be referenced
+    //   later as an offset into the string table
+//    if (storeIt) {
       String s =  readCommonString[segmentIndex].substring(offset, offset + length - 1);
       return stringHeapObj.addString(s);
-    } else {
-      return 0;
+//    } else {
+//      return 0;
+//    }
+  }
+  
+  private void skipBytes(DataInputStream stream, int skipNumber) throws IOException {
+    final int r = stream.skipBytes(skipNumber);
+    if (r == 0) {
+      throw new IOException("0 bytes skipped, causing out-of-synch while deserializing");
     }
   }
 
@@ -2188,6 +2246,13 @@ public class BinaryCasSerDes6 {
       final int modFSsLength = readVnumber(control_dis);
       int prevSeq = 0;
       
+      if ((modFSsLength > 0) && (allowPreexistingFS == AllowPreexistingFS.disallow)) {
+        CASRuntimeException e = new CASRuntimeException(
+          CASRuntimeException.DELTA_CAS_PREEXISTING_FS_DISALLOWED,
+            new String[] {String.format("%,d pre-existing Feature Structures modified", modFSsLength)});
+        throw e;
+      }
+      
 //        if (isTypeMapping) {
 //          for (int i = 0; i < AuxHeapsCount; i++) {
 //            srcHeapIndexOffset[i] = 0;
@@ -2205,7 +2270,7 @@ public class BinaryCasSerDes6 {
 
         iHeap = fsStartIndexes.getSrcAddrFromTgtSeq(seqNbrModified);
         if (iHeap < 1) {
-          // never happen because delta CAS ts system case, the 
+          // never happen because in the delta CAS ts system use-case, the 
           //   target is always a subset of the source
           //   due to type system merging
           throw new RuntimeException("never happen");
@@ -2560,9 +2625,9 @@ public class BinaryCasSerDes6 {
   
     if (isTypeMapping) {
       final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode);
-      if (tgtFeatOffsets2Src == null ) {
-        System.out.println("debug caught");
-      }
+//      if (tgtFeatOffsets2Src == null ) {
+//        System.out.println("debug caught");
+//      }
       for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
         final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;  // add one for origin 1
         if (featOffsetInSrc == 0) {
@@ -2693,6 +2758,8 @@ public class BinaryCasSerDes6 {
    * Compare 2 CASes, with perhaps different type systems.
    * If the type systems are different, construct a type mapper and use that
    *   to selectively ignore types or features not in other type system
+   *   
+   * The Maopper filters C1 -> C2.  
    * 
    * Compare only feature structures reachable via indexes or refs
    *   The order must match
@@ -2820,11 +2887,11 @@ public class BinaryCasSerDes6 {
             continue;
           }
           if ((tCode1_2 == 0) && (tCode2_1 != 0)) {
-            i2++;
+            i1++;
             continue;
           }
           if ((tCode1_2 != 0) && (tCode2_1 == 0)) {
-            i1++;
+            i2++;
             continue;
           }
         } else {  // not type mapping
@@ -2837,7 +2904,7 @@ public class BinaryCasSerDes6 {
         }
       }
       
-      if (i1 >= c1FoundFSs.length && i1 >= c2FoundFSs.length) {
+      if (i1 >= c1FoundFSs.length && i2 >= c2FoundFSs.length) {
         return true;  // end, everything compared
       }
       if (isTypeMapping) {
@@ -3083,6 +3150,7 @@ public class BinaryCasSerDes6 {
     private StringBuilder dumpHeapFs(CASImpl cas, final int iHeap, final TypeSystemImpl ts) {
       StringBuilder sb = new StringBuilder();
       typeInfo = ts.getTypeInfo(cas.getHeap().heap[iHeap]);
+      sb.append("Heap Addr: ").append(iHeap).append(' ');
       sb.append(typeInfo).append(' ');
   
       if (typeInfo.isHeapStoredArray) {
@@ -3415,7 +3483,10 @@ public class BinaryCasSerDes6 {
     }
   }
   
-  private void readHeader() throws IOException {
+  private void readHeader(InputStream istream) throws IOException {
+    deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream
+        : new DataInputStream(istream);
+
     // key
     // determine if byte swap if needed based on key
     byte[] bytebuf = new byte[4];

Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java?rev=1466505&r1=1466504&r2=1466505&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java Wed Apr 10 15:07:12 2013
@@ -1200,7 +1200,7 @@ public class CASImpl extends AbstractCas
         if (compressedVersion == 0) {
           (new BinaryCasSerDes4(this.getTypeSystemImpl(), false)).deserialize(this, dis, delta);
         } else {
-          (new BinaryCasSerDes6(this, rfs)).deserializeAfterVersion(dis, delta);
+          (new BinaryCasSerDes6(this, rfs)).deserializeAfterVersion(dis, delta, AllowPreexistingFS.allow);
         }
         return;
       }

Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java?rev=1466505&r1=1466504&r2=1466505&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java Wed Apr 10 15:07:12 2013
@@ -224,9 +224,9 @@ public class SerDesTest6 extends TestCas
   public SerDesTest6() {
     Random sg = new Random();
     long seed = sg.nextLong();
-//    seed =  -2659090483652661635L;
+//    seed =  2934127305128325787L;
     random = new Random(seed);
-//    System.out.format("RandomSeed: %,d%n", seed);
+    System.out.format("RandomSeed: %,d%n", seed);
 
     mSrc = setupTTypeSystem(TwoTypes);
     casSrc = mSrc.cas;
@@ -283,6 +283,36 @@ public class SerDesTest6 extends TestCas
   public void tearDown() {
   }
   
+  /**
+   * Make one of each kind of artifact, including arrays
+   * serialize to byte stream, deserialize into new cas, compare
+   */
+  
+  public void testAllKinds() {
+    if (doPlain) {
+      serdesSimple(getTT(EqTwoTypes));
+    } else {
+      for (TTypeSystem m : alternateTTypeSystems) {
+        switch (m.kind){
+        // note: case statements *not* grouped in order to faclitate debugging
+        case OneTypeSubsetFeatures:
+          serdesSimple(m);
+          break;
+        case TwoTypesSubsetFeatures:
+          serdesSimple(m);
+          break;
+        case TwoTypes:
+        case EqTwoTypes:
+        case OneType:
+        case TwoTypesNoFeatures:
+          serdesSimple(m);
+          break;
+        }
+      }
+    }
+  }
+
+  
   // Test chains going through filtered type
   //   Repeat below with OneType, and TwoTypes with filtered slot == fsRef
   
@@ -388,39 +418,17 @@ public class SerDesTest6 extends TestCas
     verifyDelta(marker, ri);
   }
   
-  /**
-   * Make one of each kind of artifact, including arrays
-   * serialize to byte stream, deserialize into new cas, compare
-   */
-  
-  public void testAllKinds() {
-    if (doPlain) {
-      serdesSimple(getTT(EqTwoTypes));
-    } else {
-      for (TTypeSystem m : alternateTTypeSystems) {
-        switch (m.kind){
-        case OneTypeSubsetFeatures:
-          serdesSimple(m);
-          break;
-        case TwoTypesSubsetFeatures:
-          serdesSimple(m);
-          break;
-        case TwoTypes:
-        case EqTwoTypes:
-        case OneType:
-        case TwoTypesNoFeatures:
-          serdesSimple(m);
-          break;
-        }
-      }
-    }
-  }
   
   private void serdesSimple(TTypeSystem m) {
     remoteCas = setupCas(m);
     casSrc.reset();
     loadCas(casSrc, mSrc);  
-    verify(remoteCas);    
+    verify(remoteCas);  
+    
+    // test case where serialization is done without type filtering,
+    //   and deserialization is done with filtering
+    remoteCas.reset();
+    verifyDeserFilter(remoteCas); 
   }
   
   /**
@@ -1131,6 +1139,29 @@ public class SerDesTest6 extends TestCas
       throw new RuntimeException(e);
     }    
   }
+  
+  private void verifyDeserFilter(CASImpl casTgt) {
+    // serialize w/o filter
+    BinaryCasSerDes6 bcs = null;
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+      if (doPlain) {
+        return;   
+      } else {      
+        bcs = new BinaryCasSerDes6(casSrc, (ReuseInfo) null);
+        bcs.serialize(baos);
+      }
+      ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+      bcs = new BinaryCasSerDes6(casTgt, null, casSrc.getTypeSystemImpl());
+      bcs.deserialize(bais);
+      
+      bcs = new BinaryCasSerDes6(casSrc, null, casTgt.getTypeSystemImpl());
+      assertTrue(bcs.compareCASes(casSrc, casTgt));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }    
+   
+  }
 
   // casSrc -> remoteCas
   private ReuseInfo[] serializeDeserialize(