You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2014/11/20 20:28:18 UTC
svn commit: r1640786 -
/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
Author: schor
Date: Thu Nov 20 19:28:17 2014
New Revision: 1640786
URL: http://svn.apache.org/r1640786
Log:
[UIMA-4100] support proper below-the-line updates for Binary CAS Delta Deserialization. Support general remove from all views if there are corruptable indices
Modified:
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java?rev=1640786&r1=1640785&r2=1640786&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java Thu Nov 20 19:28:17 2014
@@ -77,6 +77,7 @@ import org.apache.uima.cas.admin.CASMgr;
import org.apache.uima.cas.admin.FSIndexComparator;
import org.apache.uima.cas.admin.FSIndexRepositoryMgr;
import org.apache.uima.cas.admin.TypeSystemMgr;
+import org.apache.uima.cas.impl.FSIndexRepositoryImpl.IndexRepoTodos;
import org.apache.uima.cas.text.AnnotationFS;
import org.apache.uima.cas.text.AnnotationIndex;
import org.apache.uima.cas.text.Language;
@@ -159,7 +160,7 @@ public class CASImpl extends AbstractCas
// fields shared among all CASes belong to views of a common base CAS
private static class SharedViewData {
- private Heap heap;
+ final private Heap heap;
// private SymbolTable stringTable;
// private ArrayList stringList;
@@ -262,11 +263,13 @@ public class CASImpl extends AbstractCas
IS_CHK_FS_UPDATE_CORRUPTS_INDEX ?
new PositiveIntSet_impl() : null;
- private SharedViewData(boolean useFSCache) {
+
+ private SharedViewData(boolean useFSCache, Heap heap) {
this.useFSCache = useFSCache;
+ this.heap = heap;
}
}
-
+
// -----------------------------------------------------
// Non-shared instance data for base CAS and each view
// -----------------------------------------------------
@@ -365,12 +368,12 @@ public class CASImpl extends AbstractCas
// FSClassRegistry instances
}
- this.svd = new SharedViewData(useFSCache);
+ this.svd = new SharedViewData(useFSCache, new Heap(initialHeapSize));
this.svd.casMetadata = ts.casMetadata;
this.svd.baseCAS = this;
// Set up new heaps
- this.svd.heap = new Heap(initialHeapSize);
+// this.svd.heap = new Heap(initialHeapSize);
this.svd.stringHeap = new StringHeap();
// initial size 16
this.svd.byteHeap = new ByteHeap();
@@ -429,13 +432,15 @@ public class CASImpl extends AbstractCas
}
// get the indexRepository for this Sofa
- this.indexRepository = (this.mySofaRef == -1) ? (FSIndexRepositoryImpl) cas
- .getSofaIndexRepository(1) : (FSIndexRepositoryImpl) cas.getSofaIndexRepository(aSofa);
+ this.indexRepository = (this.mySofaRef == -1) ?
+ (FSIndexRepositoryImpl) cas.getSofaIndexRepository(1) :
+ (FSIndexRepositoryImpl) cas.getSofaIndexRepository(aSofa);
if (null == this.indexRepository) {
// create the indexRepository for this CAS
// use the baseIR to create a lightweight IR copy
- this.indexRepository = new FSIndexRepositoryImpl(this, (FSIndexRepositoryImpl) cas
- .getBaseIndexRepository());
+ this.indexRepository = new FSIndexRepositoryImpl(
+ this,
+ (FSIndexRepositoryImpl) cas.getBaseIndexRepository());
this.indexRepository.commit();
// save new sofa index
if (this.mySofaRef == -1) {
@@ -1179,6 +1184,22 @@ public class CASImpl extends AbstractCas
reinitIndexedFSs(fsIndex);
}
+
+ /**
+ * Binary Deserializaion Support
+ * An instance of this class is made and shared
+ *
+ */
+ private static class BinDeserSupport {
+
+ private int fsStartAddr;
+ private int fsEndAddr;
+ private int[] fssAddrArray;
+ private int fssIndex;
+ private int lastRemovedFsAddr;
+ private List<FSIndexRepositoryImpl> indexRepos = new ArrayList<FSIndexRepositoryImpl>(4);
+ }
+
/**
* --------------------------------------------------------------------- see
* Blob Format in CASSerializer
@@ -1199,6 +1220,7 @@ public class CASImpl extends AbstractCas
final DataInputStream dis = (istream instanceof DataInputStream) ?
(DataInputStream) istream : new DataInputStream(istream);
+ final BinDeserSupport bds = new BinDeserSupport();
try {
// key
// determine if byte swap if needed based on key
@@ -1247,6 +1269,7 @@ public class CASImpl extends AbstractCas
this.getHeap().grow(fsheapsz);
}
+ // add new heap slots
for (int i = startPos; i < fsheapsz+startPos; i++) {
this.getHeap().heap[i] = readInt(dis, swap);
}
@@ -1290,10 +1313,32 @@ public class CASImpl extends AbstractCas
//if delta, handle modified fs heap cells
if (delta) {
+
+ final int heapsize = this.getHeap().getNextId();
+
+ // compute table of ints which correspond to FSs in the existing heap
+ // we need this because the list of modifications is just arbitrary single words on the heap
+ // at arbitrary boundaries
+ IntVector fss = new IntVector(Math.max(128, heapsize >> 6));
+ int fsAddr;
+ for (fsAddr = 1; fsAddr < heapsize; fsAddr = getNextFsHeapAddr(fsAddr)) {
+ fss.add(fsAddr);
+ }
+ fss.add(fsAddr); // add trailing value
+ bds.fssAddrArray = fss.toArray();
+
int fsmodssz = readInt(dis, swap);
+ bds.fsStartAddr = -1;
+ final ArrayList<FSIndexRepositoryImpl> indexRepos = new ArrayList<FSIndexRepositoryImpl>();
+
+
for (int i = 0; i < fsmodssz; i++) {
- this.getHeap().heap[readInt(dis, swap)] = readInt(dis, swap);
+ final int heapAddrBeingModified = readInt(dis, swap);
+ maybeAddBackAndRemoveFs(heapAddrBeingModified, bds);
+ this.getHeap().heap[heapAddrBeingModified] = readInt(dis, swap);
}
+ addBackRemovedFsToAppropViews(bds.lastRemovedFsAddr, indexRepos);
+ bds.fssAddrArray = null; // free storage
}
// indexed FSs
@@ -1417,6 +1462,82 @@ public class CASImpl extends AbstractCas
return SerialFormat.BINARY;
}
+ /**
+ * for Deserialization of Delta, when updating existing FSs,
+ * If the heap addr is for the next FS, re-add the previous one to those indices where it was removed,
+ * and then remove the new one (and remember which views to re-add to).
+ * @param heapAddr
+ */
+ private void maybeAddBackAndRemoveFs(int heapAddr, final BinDeserSupport bds) {
+ if (bds.fsStartAddr == -1) {
+ bds.fssIndex = -1;
+ bds.lastRemovedFsAddr = -1;
+ }
+ findCorrespondingFs(heapAddr, bds); // sets fsStartAddr, end addr
+ if (bds.lastRemovedFsAddr != bds.fsStartAddr) {
+ addBackRemovedFsToAppropViews(bds.lastRemovedFsAddr, bds.indexRepos);
+ removeFromCorruptableIndexAnyView(bds.lastRemovedFsAddr = bds.fsStartAddr, bds.indexRepos);
+ }
+ }
+
+ void addBackRemovedFsToAppropViews(int fsAddr, List<FSIndexRepositoryImpl> indexRepos) {
+ for (FSIndexRepositoryImpl ir : indexRepos) {
+ ir.addFS(fsAddr);
+ }
+ indexRepos.clear();
+ }
+
+ private void findCorrespondingFs(int heapAddr, final BinDeserSupport bds) {
+ if (bds.fsStartAddr < heapAddr && heapAddr < bds.fsEndAddr) {
+ return;
+ }
+
+ // search forward by 1 before doing binary search
+ bds.fssIndex ++;
+ bds.fsStartAddr = bds.fssAddrArray[bds.fssIndex]; // must exist
+ if (bds.fssIndex + 1 < bds.fssAddrArray.length) { // handle edge case where prev was at the end
+ bds.fsEndAddr = bds.fssAddrArray[bds.fssIndex + 1]; // must exist
+ if (bds.fsStartAddr < heapAddr && heapAddr < bds.fsEndAddr) {
+ return;
+ }
+ }
+
+ int result;
+ if (heapAddr > bds.fsEndAddr) {
+ // item is higher
+ result = Arrays.binarySearch(bds.fssAddrArray, bds.fssIndex + 1, bds.fssAddrArray.length, heapAddr);
+ } else {
+ result = Arrays.binarySearch(bds.fssAddrArray, 0, bds.fssIndex - 2, heapAddr);
+ }
+
+ // result must be negative - should never modify a type code slot
+ assert (result < 0);
+ bds.fssIndex = (-result) - 2;
+ bds.fsStartAddr = bds.fssAddrArray[bds.fssIndex];
+ bds.fsEndAddr = bds.fssAddrArray[bds.fssIndex + 1];
+ assert(bds.fsStartAddr < heapAddr && heapAddr < bds.fsEndAddr);
+ }
+
+ private int getNextFsHeapAddr(int fsAddr) {
+ final TypeSystemImpl tsi = getTypeSystemImpl();
+ final int typeCode = getTypeCode(fsAddr);
+ final Type type = tsi.ll_getTypeForCode(typeCode);
+ //debug
+// if (tsi.ll_getTypeForCode(typeCode) == null) {
+// System.out.println("debug, typeCode = "+ typeCode);
+// }
+ final boolean isHeapStoredArray = (typeCode == tsi.intArrayTypeCode) || (typeCode == tsi.floatArrayTypeCode)
+ || (typeCode == tsi.fsArrayTypeCode) || (typeCode == tsi.stringArrayTypeCode)
+ || (TypeSystemImpl.isArrayTypeNameButNotBuiltIn(type.getName()));
+ if (isHeapStoredArray) {
+ return fsAddr + 2 + getHeapValue(fsAddr + 1);
+ } else if (type.isArray()) {
+ return fsAddr + 3; // for the aux ref and the length
+ } else {
+ return fsAddr + this.svd.casMetadata.fsSpaceReq[typeCode];
+ }
+ }
+
private long readLong(DataInputStream dis, boolean swap) throws IOException {
long v = dis.readLong();
return swap ? Long.reverseBytes(v) : v;
@@ -1515,63 +1636,83 @@ public class CASImpl extends AbstractCas
}
}
- // fsIndex contains added, removed and reindexed FS per view
+ /**
+ * Adds the SofaFSs to the base view
+ * Assumes "cas" refers to the base cas
+ *
+ * Processes "adds", "removes" and "reindexes" for all views
+ *
+ * @param fsIndex - array of fsRefs and counts, for sofas, and all views
+ */
void reinitDeltaIndexedFSs(int[] fsIndex) {
- // Add FSs to index repository for base CAS
- int numViews = fsIndex[0]; //total number of views
- int loopLen = fsIndex[1]; // number of sofas, not necessarily the same as
- // number of views. Should only contain new Sofas.
- for (int i = 2; i < loopLen + 2; i++) { // iterate over all the sofas,
- this.indexRepository.addFS(fsIndex[i]); // add to base index
- }
- int loopStart = loopLen + 2;
+ assert(this.svd.baseCAS == this);
+ // Add Sofa FSs to index repository for base CAS
+ int numViews = fsIndex[0]; // total number of views
+ int loopLen = fsIndex[1]; // number of sofas, not necessarily the same as number of views (initial view could be missing a Sofa)
+ // add Sofa FSs to base view number of views. Should only contain new Sofas.
+ for (int i = 2; i < loopLen + 2; i++) { // iterate over all the sofas,
+ this.indexRepository.addFS(fsIndex[i]); // add to base index
+ }
+ int loopStart = loopLen + 2;
- FSIterator<SofaFS> iterator = this.svd.baseCAS.getSofaIterator();
- final Feature idFeat = getTypeSystem().getFeatureByFullName(CAS.FEATURE_FULL_NAME_SOFAID);
- // Add FSs to index repository for each View
- while (iterator.isValid()) {
- SofaFS sofa = (SofaFS) iterator.get();
- String id = getLowLevelCAS().ll_getStringValue(((FeatureStructureImpl) sofa).getAddress(),
- ((FeatureImpl) idFeat).getCode());
- if (CAS.NAME_DEFAULT_SOFA.equals(id)) {
- this.registerInitialSofa();
- this.svd.sofaNameSet.add(id);
- }
- // next line the getView as a side effect
- // checks for dupl sofa name, and if not,
- // adds the name to the sofaNameSet
- ((CASImpl) this.getView(sofa)).registerView(sofa);
+ FSIterator<SofaFS> iterator = this.getSofaIterator();
+ final int idFeatCode = ((FeatureImpl)getTypeSystem().getFeatureByFullName(CAS.FEATURE_FULL_NAME_SOFAID)).getCode();
+
+ // Register all Sofas
+ while (iterator.isValid()) {
+ SofaFS sofa = (SofaFS) iterator.get();
+ String id = getLowLevelCAS().ll_getStringValue(((FeatureStructureImpl) sofa).getAddress(), idFeatCode);
+ if (CAS.NAME_DEFAULT_SOFA.equals(id)) {
+ this.registerInitialSofa();
+ this.svd.sofaNameSet.add(id);
+ }
+ // next line the getView as a side effect
+ // checks for dupl sofa name, and if not,
+ // adds the name to the sofaNameSet
+ ((CASImpl) this.getView(sofa)).registerView(sofa);
- iterator.moveToNext();
- }
- this.svd.viewCount = numViews; // total number of views
-
- for (int viewNbr = 1; viewNbr <= numViews; viewNbr++) {
+ iterator.moveToNext();
+ }
+
+ this.svd.viewCount = numViews; // total number of views
+
+ for (int viewNbr = 1; viewNbr <= numViews; viewNbr++) {
CAS view = (viewNbr == 1) ? getInitialView() : getView(viewNbr);
if (view != null) {
+
+ // for all views
+
FSIndexRepositoryImpl loopIndexRep = (FSIndexRepositoryImpl) getSofaIndexRepository(viewNbr);
loopLen = fsIndex[loopStart];
+
+ // add FSs to index
+
for (int i = loopStart + 1; i < loopStart + 1 + loopLen; i++) {
loopIndexRep.addFS(fsIndex[i]);
}
+
+ // remove FSs from indexes
+
loopStart += loopLen + 1;
loopLen = fsIndex[loopStart];
for (int i = loopStart + 1; i < loopStart + 1 + loopLen; i++) {
loopIndexRep.removeFS(fsIndex[i]);
}
+
+ // skip the reindex - this isn't done here https://issues.apache.org/jira/browse/UIMA-4100
+ // but we need to run the loop to read over the items in the input stream
loopStart += loopLen + 1;
loopLen = fsIndex[loopStart];
for (int i = loopStart + 1; i < loopStart + 1 + loopLen; i++) {
- loopIndexRep.removeFS(fsIndex[i]);
- loopIndexRep.addFS(fsIndex[i]);
+// loopIndexRep.removeFS(fsIndex[i]);
+// loopIndexRep.addFS(fsIndex[i]);
}
loopStart += loopLen + 1;
((CASImpl) view).updateDocumentAnnotation();
} else {
loopStart += 1;
}
- }
-
+ }
}
// IndexedFSs format:
@@ -2537,7 +2678,7 @@ public class CASImpl extends AbstractCas
FSIndexComparator comp = this.indexRepository.createComparator();
comp.setType(ts.sofaType);
comp.addKey(ts.sofaNum, FSIndexComparator.STANDARD_COMPARE);
- this.indexRepository.createIndex(comp, CAS.SOFA_INDEX_NAME, FSIndex.SET_INDEX);
+ this.indexRepository.createIndex(comp, CAS.SOFA_INDEX_NAME, FSIndex.BAG_INDEX);
comp = this.indexRepository.createComparator();
comp.setType(ts.annotType);
@@ -3247,6 +3388,65 @@ public class CASImpl extends AbstractCas
}
return false; // not in any view's indexes
}
+
+ /**
+ * Remove the fsRef from any corruptable index in any view, and remember
+ * per view whether it was actually in the index.
+ * @param fsRef -
+ * @param toBeAdded -
+ */
+ void removeFromCorruptableIndexAnyView(final int fsRef, IndexRepoTodos toBeAdded) {
+ final int typeCode = getTypeCode(fsRef);
+ final TypeSystemImpl tsi = getTypeSystemImpl();
+ if (tsi.isAnnotationBaseOrSubtype(typeCode)) {
+ // only need to check one view
+ FSIndexRepositoryImpl ir = ll_getSofaCasView(fsRef).indexRepository;
+ if (ir.removeIfInCorrputableIndexInThisView(fsRef)) {
+ toBeAdded.addTodo(ir, fsRef);
+ }
+ return;
+ }
+
+ // not a subtype of AnnotationBase, need to check all views (except base)
+ // sofas indexed in the base view are not corruptable.
+ final Iterator<CAS> viewIterator = getViewIterator();
+ while (viewIterator.hasNext()) {
+ FSIndexRepositoryImpl repo = (FSIndexRepositoryImpl) viewIterator.next().getIndexRepository();
+ if (repo.removeIfInCorrputableIndexInThisView(fsRef)) {
+ toBeAdded.addTodo(repo, fsRef);
+ }
+ }
+ }
+
+ /**
+ * Remove the fsRef from any corruptable index in any view, and remember
+ * per view whether it was actually in the index.
+ * @param fsRef -
+ * @param todoRepos -
+ */
+ void removeFromCorruptableIndexAnyView(final int fsRef, List<FSIndexRepositoryImpl> todoRepos) {
+ todoRepos.clear();
+ final int typeCode = getTypeCode(fsRef);
+ final TypeSystemImpl tsi = getTypeSystemImpl();
+ if (tsi.isAnnotationBaseOrSubtype(typeCode)) {
+ // only need to check one view
+ FSIndexRepositoryImpl ir = ll_getSofaCasView(fsRef).indexRepository;
+ if (ir.removeIfInCorrputableIndexInThisView(fsRef)) {
+ todoRepos.add(ir);
+ }
+ return;
+ }
+
+ // not a subtype of AnnotationBase, need to check all views (except base)
+ // sofas indexed in the base view are not corruptable.
+ final Iterator<CAS> viewIterator = getViewIterator();
+ while (viewIterator.hasNext()) {
+ FSIndexRepositoryImpl repo = (FSIndexRepositoryImpl) viewIterator.next().getIndexRepository();
+ if (repo.removeIfInCorrputableIndexInThisView(fsRef)) {
+ todoRepos.add(repo);
+ }
+ }
+ }
public final void ll_setIntValue(int fsRef, int featureCode, int value) {
checkForInvalidFeatureSetting(fsRef, featureCode);
@@ -4304,11 +4504,6 @@ public class CASImpl extends AbstractCas
return this;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.uima.cas.CAS#getViewIterator()
- */
// public Iterator<CAS> getViewIterator() {
// List<CAS> viewList = new ArrayList<CAS>();
// // add initial view if it has no sofa
@@ -4323,12 +4518,22 @@ public class CASImpl extends AbstractCas
// return viewList.iterator();
// }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.uima.cas.CAS#getViewIterator()
+ */
public Iterator<CAS> getViewIterator() {
return new Iterator<CAS>() {
final CASImpl initialView = (CASImpl) getInitialView(); // creates one if not existing, w/o sofa
- boolean testInitView = !initialView.mySofaIsValid();
+ boolean testInitView = !initialView.mySofaIsValid(); // true if has no Sofa in initial view
+ // but is reset to false once iterater moves
+ // off of initial view.
+
+ // if initial view has a sofa, we just use the
+ // sofa iterator instead.
final FSIterator<SofaFS> sofaIter = getSofaIterator();