You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2013/04/10 17:07:13 UTC
svn commit: r1466505 - in /uima/uimaj/trunk/uimaj-core/src:
main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
main/java/org/apache/uima/cas/impl/CASImpl.java
test/java/org/apache/uima/cas/impl/SerDesTest6.java
Author: schor
Date: Wed Apr 10 15:07:12 2013
New Revision: 1466505
URL: http://svn.apache.org/r1466505
Log:
[UIMA-2498 UIMA-2778] add test case for type filtering while deserializing and fix many bugs it exposed. Add support for parallel step
Modified:
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java
Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java?rev=1466505&r1=1466504&r2=1466505&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java Wed Apr 10 15:07:12 2013
@@ -198,6 +198,11 @@ import org.apache.uima.util.impl.Seriali
public class BinaryCasSerDes6 {
private static final int[] INT0 = new int[0];
+
+ private static final boolean TRACE_SER = false;
+ private static final boolean TRACE_DES = false;
+
+ private static final boolean TRACE_STR_ARRAY = false;
/**
* Version of the serializer/deserializer, used to allow deserialization of
* older versions
@@ -404,6 +409,7 @@ public class BinaryCasSerDes6 {
* Things for just deserialization
**********************************/
+ private AllowPreexistingFS allowPreexistingFS;
private DataInputStream deserIn;
private int version;
@@ -457,6 +463,7 @@ public class BinaryCasSerDes6 {
* For normal serialization - can be null, but if not, is used in place of re-calculating, for speed up
* For delta deserialization - must not be null, and is the saved value after serializing to the service
* For normal deserialization - must be null
+ * @param allowPreexistingFSs - if false, throws error if deserializing delta cas has modifications below the mark, for parallel remotes
* @param doMeasurements if true, measurements are done (on serialization)
* @param compressLevel if not null, specifies enum instance for compress level
* @param compressStrategy if not null, specifies enum instance for compress strategy
@@ -466,7 +473,7 @@ public class BinaryCasSerDes6 {
MarkerImpl mark,
TypeSystemImpl tgtTs,
ReuseInfo rfs,
- boolean doMeasurements,
+ boolean doMeasurements,
CompressLevel compressLevel,
CompressStrat compressStrategy) {
cas = ((CASImpl) ((aCas instanceof JCas) ? ((JCas)aCas).getCas(): aCas)).getBaseCAS();
@@ -482,7 +489,6 @@ public class BinaryCasSerDes6 {
this.sm = doMeasurements ? new SerializationMeasures() : null;
isDelta = isSerializingDelta = (mark != null);
- doMeasurements = (sm != null);
typeMapperCmn = typeMapper = ts.getTypeSystemMapper(tgtTs);
isTypeMappingCmn = isTypeMapping = (null != typeMapper);
@@ -673,6 +679,9 @@ public class BinaryCasSerDes6 {
}
final int tCode = heap[iHeap]; // get type code
final int mappedTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode;
+ if (TRACE_SER) {
+ System.out.format("Ser: adr: %,d tCode: %s %,d tgtTypeCode: %,d %n", iHeap, tCode, ts.getTypeInfo(tCode), mappedTypeCode);
+ }
if (mappedTypeCode == 0) { // means no corresponding type in target system
continue;
}
@@ -760,6 +769,9 @@ public class BinaryCasSerDes6 {
break;
case Slot_StrRef:
for (int i = iHeap + 2; i < endi; i++) {
+ if (TRACE_STR_ARRAY) {
+ System.out.format("Trace Str Array Ser: addr: %,d string=%s%n", i, stringHeapObj.getStringForCode(heap[i]));
+ }
writeString(stringHeapObj.getStringForCode(heap[i]));
}
break;
@@ -1541,10 +1553,7 @@ public class BinaryCasSerDes6 {
*************************************************************************************/
public void deserialize(InputStream istream) throws IOException {
- deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream
- : new DataInputStream(istream);
-
- readHeader();
+ readHeader(istream);
if (isReadingDelta) {
if (!reuseInfoProvided) {
@@ -1554,10 +1563,36 @@ public class BinaryCasSerDes6 {
cas.resetNoQuestions();
}
- deserializeAfterVersion(deserIn, isReadingDelta);
+ deserializeAfterVersion(deserIn, isReadingDelta, AllowPreexistingFS.allow);
}
- public void deserializeAfterVersion(DataInputStream istream, boolean isDelta) throws IOException {
+ /**
+ * Version used by uima-as to read delta cas from remote parallel steps
+ * @param istream
+ * @param allowPreexistingFSs
+ * @throws IOException
+ */
+ public void deserialize(InputStream istream, AllowPreexistingFS allowPreexistingFS) throws IOException {
+ readHeader(istream);
+
+ if (isReadingDelta) {
+ if (!reuseInfoProvided) {
+ throw new UnsupportedOperationException("Deserializing Delta Cas, but original not serialized from");
+ }
+ } else {
+ throw new UnsupportedOperationException("Delta CAS required for this call");
+ }
+
+ deserializeAfterVersion(deserIn, isReadingDelta, allowPreexistingFS);
+ }
+
+
+ public void deserializeAfterVersion(DataInputStream istream, boolean isDelta, AllowPreexistingFS allowPreexistingFS) throws IOException {
+
+ this.allowPreexistingFS = allowPreexistingFS;
+ if (allowPreexistingFS == AllowPreexistingFS.ignore) {
+ throw new UnsupportedOperationException("AllowPreexistingFS.ignore not an allowed setting");
+ }
deserIn = istream;
this.isDelta = isReadingDelta = isDelta;
@@ -1604,7 +1639,6 @@ public class BinaryCasSerDes6 {
// Above the merge line: only the 2nd is possible
if (isReadingDelta) {
- // scan current source being added to / merged into
if (!reuseInfoProvided) {
throw new IllegalStateException("Reading Delta into CAS not serialized from");
}
@@ -1621,6 +1655,7 @@ public class BinaryCasSerDes6 {
}
final int tgtTypeCode = readVnumber(typeCode_dis); // get type code
final int srcTypeCode = isTypeMapping ? typeMapper.mapTypeCodeTgt2Src(tgtTypeCode) : tgtTypeCode;
+
final boolean storeIt = (srcTypeCode != 0);
// A receiving client from a service always
// has a superset of the service's types due to type merging so this
@@ -1629,8 +1664,8 @@ public class BinaryCasSerDes6 {
// deleted a type.
// The strategy for deserializing heap refs depends on finding
- // the prev value for that type. This can be skipped for
- // types being skipped because they don't exist in the source
+ // the prev value for that type. This must be done in the context
+ // of the sending CAS's type system
// typeInfo is Target Type Info
final TypeInfo tgtTypeInfo = isTypeMapping ? tgtTs.getTypeInfo(tgtTypeCode) :
@@ -1643,7 +1678,14 @@ public class BinaryCasSerDes6 {
typeInfo = tgtTypeInfo;
initPrevIntValue(iHeap); // note "typeInfo" a hidden parameter - ugly...
}
- typeInfo = srcTypeInfo;
+ if (TRACE_DES) {
+ System.out.format("Des: addr %,d tgtTypeCode: %,d %s srcTypeCode: %,d%n", iHeap, tgtTypeCode, tgtTypeInfo, srcTypeCode);
+ }
+
+// if (srcTypeInfo == null) {
+// typeInfo = null; // debugging
+// }
+ typeInfo = storeIt ? srcTypeInfo : tgtTypeInfo; // if !storeIt, then srcTypeInfo is null.
fsStartIndexes.addSrcAddrForTgt(iHeap, storeIt);
if (storeIt) {
@@ -1667,7 +1709,7 @@ public class BinaryCasSerDes6 {
final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(srcTypeCode);
for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1;
- SlotKind kind = typeInfo.getSlotKind(featOffsetInSrc);
+ SlotKind kind = tgtTypeInfo.slotKinds[i]; // target kind , may not exist in src
readByKind(iHeap, featOffsetInSrc, kind, storeIt);
}
} else {
@@ -1767,7 +1809,7 @@ public class BinaryCasSerDes6 {
for (int i = startIheap; i < endi; i++) {
final int v = readDiff(arrayElementKind, prev);
prev = v;
- if (startIheap == i && isUpdatePrevOK) {
+ if (startIheap == i && isUpdatePrevOK && storeIt) {
updatePrevIntValue(iHeap, 2, v);
}
if (storeIt) {
@@ -1790,7 +1832,10 @@ public class BinaryCasSerDes6 {
break;
case Slot_StrRef:
for (int i = iHeap + 2; i < endi; i++) {
- final int strRef = readString(storeIt);
+ final int strRef = readString(storeIt);
+ if (TRACE_STR_ARRAY) {
+ System.out.format("Trace String Array Des addr: %,d storeIt=%s, string=%s%n", i, storeIt ? "Y" : "N", stringHeapObj.getStringForCode(strRef));
+ }
if (storeIt) {
heap[i] = strRef;
}
@@ -1809,7 +1854,7 @@ public class BinaryCasSerDes6 {
*/
private void readByKind(int iHeap, int offset, SlotKind kind, boolean storeIt) throws IOException {
- if (offset == -1) {
+ if (offset == 0) {
storeIt = false;
}
switch (kind) {
@@ -1842,7 +1887,7 @@ public class BinaryCasSerDes6 {
}
break;
case Slot_LongRef: {
- long v = readLongOrDouble(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
+ long v = readLongOrDouble(kind, (!storeIt || (iPrevHeap == 0)) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset]));
if (v == 0L) {
if (longZeroIndex == -1) {
longZeroIndex = longHeapObj.addLong(0L);
@@ -1954,7 +1999,7 @@ public class BinaryCasSerDes6 {
byte_dis.readFully(byteHeapObj.heap, startPos, length);
return startPos;
} else {
- byte_dis.skipBytes(length);
+ skipBytes(byte_dis, length);
return 0;
}
}
@@ -1970,7 +2015,7 @@ public class BinaryCasSerDes6 {
}
return startPos;
} else {
- short_dis.skipBytes(length * 2);
+ skipBytes(short_dis, length * 2);
return 0;
}
}
@@ -2009,7 +2054,7 @@ public class BinaryCasSerDes6 {
} else {
v = readDiff(kind, 0);
}
- if (isUpdatePrevOK) {
+ if (storeIt && isUpdatePrevOK) {
updatePrevIntValue(iHeap, offset, v);
}
}
@@ -2057,8 +2102,8 @@ public class BinaryCasSerDes6 {
private void skipLong(final int length) throws IOException {
for (int i = 0; i < length; i++) {
- long_High_dis.skipBytes(8);
- long_Low_dis.skipBytes(8);
+ skipBytes(long_High_dis, 8);
+ skipBytes(long_Low_dis, 8);
}
}
@@ -2126,15 +2171,19 @@ public class BinaryCasSerDes6 {
return 0;
}
if (1 == length) {
- if (storeIt) {
+ // always store, in case later offset ref
+// if (storeIt) {
return stringHeapObj.addString("");
- } else {
- return 0;
- }
+// } else {
+// return 0;
+// }
}
if (length < 0) { // in this case, -length is the slot index
if (storeIt) {
+ if (TRACE_STR_ARRAY) {
+ System.out.format("Trace String Array Des ref to offset %,d%n", length);
+ }
return stringTableOffset - length;
} else {
return 0;
@@ -2146,11 +2195,20 @@ public class BinaryCasSerDes6 {
if (debugEOF) {
System.out.format("readString offset = %,d%n", offset);
}
- if (storeIt) {
+ // need to store all strings, because an otherwise skipped one may be referenced
+ // later as an offset into the string table
+// if (storeIt) {
String s = readCommonString[segmentIndex].substring(offset, offset + length - 1);
return stringHeapObj.addString(s);
- } else {
- return 0;
+// } else {
+// return 0;
+// }
+ }
+
+ private void skipBytes(DataInputStream stream, int skipNumber) throws IOException {
+ final int r = stream.skipBytes(skipNumber);
+ if (r == 0) {
+ throw new IOException("0 bytes skipped, causing out-of-synch while deserializing");
}
}
@@ -2188,6 +2246,13 @@ public class BinaryCasSerDes6 {
final int modFSsLength = readVnumber(control_dis);
int prevSeq = 0;
+ if ((modFSsLength > 0) && (allowPreexistingFS == AllowPreexistingFS.disallow)) {
+ CASRuntimeException e = new CASRuntimeException(
+ CASRuntimeException.DELTA_CAS_PREEXISTING_FS_DISALLOWED,
+ new String[] {String.format("%,d pre-existing Feature Structures modified", modFSsLength)});
+ throw e;
+ }
+
// if (isTypeMapping) {
// for (int i = 0; i < AuxHeapsCount; i++) {
// srcHeapIndexOffset[i] = 0;
@@ -2205,7 +2270,7 @@ public class BinaryCasSerDes6 {
iHeap = fsStartIndexes.getSrcAddrFromTgtSeq(seqNbrModified);
if (iHeap < 1) {
- // never happen because delta CAS ts system case, the
+ // never happen because in the delta CAS ts system use-case, the
// target is always a subset of the source
// due to type system merging
throw new RuntimeException("never happen");
@@ -2560,9 +2625,9 @@ public class BinaryCasSerDes6 {
if (isTypeMapping) {
final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode);
- if (tgtFeatOffsets2Src == null ) {
- System.out.println("debug caught");
- }
+// if (tgtFeatOffsets2Src == null ) {
+// System.out.println("debug caught");
+// }
for (int i = 0; i < tgtFeatOffsets2Src.length; i++) {
final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1; // add one for origin 1
if (featOffsetInSrc == 0) {
@@ -2693,6 +2758,8 @@ public class BinaryCasSerDes6 {
* Compare 2 CASes, with perhaps different type systems.
* If the type systems are different, construct a type mapper and use that
* to selectively ignore types or features not in other type system
+ *
+ * The Maopper filters C1 -> C2.
*
* Compare only feature structures reachable via indexes or refs
* The order must match
@@ -2820,11 +2887,11 @@ public class BinaryCasSerDes6 {
continue;
}
if ((tCode1_2 == 0) && (tCode2_1 != 0)) {
- i2++;
+ i1++;
continue;
}
if ((tCode1_2 != 0) && (tCode2_1 == 0)) {
- i1++;
+ i2++;
continue;
}
} else { // not type mapping
@@ -2837,7 +2904,7 @@ public class BinaryCasSerDes6 {
}
}
- if (i1 >= c1FoundFSs.length && i1 >= c2FoundFSs.length) {
+ if (i1 >= c1FoundFSs.length && i2 >= c2FoundFSs.length) {
return true; // end, everything compared
}
if (isTypeMapping) {
@@ -3083,6 +3150,7 @@ public class BinaryCasSerDes6 {
private StringBuilder dumpHeapFs(CASImpl cas, final int iHeap, final TypeSystemImpl ts) {
StringBuilder sb = new StringBuilder();
typeInfo = ts.getTypeInfo(cas.getHeap().heap[iHeap]);
+ sb.append("Heap Addr: ").append(iHeap).append(' ');
sb.append(typeInfo).append(' ');
if (typeInfo.isHeapStoredArray) {
@@ -3415,7 +3483,10 @@ public class BinaryCasSerDes6 {
}
}
- private void readHeader() throws IOException {
+ private void readHeader(InputStream istream) throws IOException {
+ deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream
+ : new DataInputStream(istream);
+
// key
// determine if byte swap if needed based on key
byte[] bytebuf = new byte[4];
Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java?rev=1466505&r1=1466504&r2=1466505&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java Wed Apr 10 15:07:12 2013
@@ -1200,7 +1200,7 @@ public class CASImpl extends AbstractCas
if (compressedVersion == 0) {
(new BinaryCasSerDes4(this.getTypeSystemImpl(), false)).deserialize(this, dis, delta);
} else {
- (new BinaryCasSerDes6(this, rfs)).deserializeAfterVersion(dis, delta);
+ (new BinaryCasSerDes6(this, rfs)).deserializeAfterVersion(dis, delta, AllowPreexistingFS.allow);
}
return;
}
Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java?rev=1466505&r1=1466504&r2=1466505&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java Wed Apr 10 15:07:12 2013
@@ -224,9 +224,9 @@ public class SerDesTest6 extends TestCas
public SerDesTest6() {
Random sg = new Random();
long seed = sg.nextLong();
-// seed = -2659090483652661635L;
+// seed = 2934127305128325787L;
random = new Random(seed);
-// System.out.format("RandomSeed: %,d%n", seed);
+ System.out.format("RandomSeed: %,d%n", seed);
mSrc = setupTTypeSystem(TwoTypes);
casSrc = mSrc.cas;
@@ -283,6 +283,36 @@ public class SerDesTest6 extends TestCas
public void tearDown() {
}
+ /**
+ * Make one of each kind of artifact, including arrays
+ * serialize to byte stream, deserialize into new cas, compare
+ */
+
+ public void testAllKinds() {
+ if (doPlain) {
+ serdesSimple(getTT(EqTwoTypes));
+ } else {
+ for (TTypeSystem m : alternateTTypeSystems) {
+ switch (m.kind){
+ // note: case statements *not* grouped in order to faclitate debugging
+ case OneTypeSubsetFeatures:
+ serdesSimple(m);
+ break;
+ case TwoTypesSubsetFeatures:
+ serdesSimple(m);
+ break;
+ case TwoTypes:
+ case EqTwoTypes:
+ case OneType:
+ case TwoTypesNoFeatures:
+ serdesSimple(m);
+ break;
+ }
+ }
+ }
+ }
+
+
// Test chains going through filtered type
// Repeat below with OneType, and TwoTypes with filtered slot == fsRef
@@ -388,39 +418,17 @@ public class SerDesTest6 extends TestCas
verifyDelta(marker, ri);
}
- /**
- * Make one of each kind of artifact, including arrays
- * serialize to byte stream, deserialize into new cas, compare
- */
-
- public void testAllKinds() {
- if (doPlain) {
- serdesSimple(getTT(EqTwoTypes));
- } else {
- for (TTypeSystem m : alternateTTypeSystems) {
- switch (m.kind){
- case OneTypeSubsetFeatures:
- serdesSimple(m);
- break;
- case TwoTypesSubsetFeatures:
- serdesSimple(m);
- break;
- case TwoTypes:
- case EqTwoTypes:
- case OneType:
- case TwoTypesNoFeatures:
- serdesSimple(m);
- break;
- }
- }
- }
- }
private void serdesSimple(TTypeSystem m) {
remoteCas = setupCas(m);
casSrc.reset();
loadCas(casSrc, mSrc);
- verify(remoteCas);
+ verify(remoteCas);
+
+ // test case where serialization is done without type filtering,
+ // and deserialization is done with filtering
+ remoteCas.reset();
+ verifyDeserFilter(remoteCas);
}
/**
@@ -1131,6 +1139,29 @@ public class SerDesTest6 extends TestCas
throw new RuntimeException(e);
}
}
+
+ private void verifyDeserFilter(CASImpl casTgt) {
+ // serialize w/o filter
+ BinaryCasSerDes6 bcs = null;
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ if (doPlain) {
+ return;
+ } else {
+ bcs = new BinaryCasSerDes6(casSrc, (ReuseInfo) null);
+ bcs.serialize(baos);
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ bcs = new BinaryCasSerDes6(casTgt, null, casSrc.getTypeSystemImpl());
+ bcs.deserialize(bais);
+
+ bcs = new BinaryCasSerDes6(casSrc, null, casTgt.getTypeSystemImpl());
+ assertTrue(bcs.compareCASes(casSrc, casTgt));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
// casSrc -> remoteCas
private ReuseInfo[] serializeDeserialize(