You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2017/10/03 19:34:19 UTC
[33/65] [abbrv] jena git commit: JENA-1397: Rename java packages
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriter.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriter.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriter.java
new file mode 100644
index 0000000..eac7a5d
--- /dev/null
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriter.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.trans.bplustree.rewriter ;
+
+import static org.apache.jena.dboe.trans.bplustree.rewriter.BPlusTreeRewriterUtils.divider;
+import static org.apache.jena.dboe.trans.bplustree.rewriter.BPlusTreeRewriterUtils.printIndexBlocks;
+import static org.apache.jena.dboe.trans.bplustree.rewriter.BPlusTreeRewriterUtils.summarizeDataBlocks;
+
+import java.util.Iterator ;
+import java.util.List ;
+
+import org.apache.jena.atlas.iterator.Iter ;
+import org.apache.jena.atlas.iterator.IteratorWithBuffer ;
+import org.apache.jena.atlas.lib.Pair ;
+import org.apache.jena.dboe.base.block.BlockMgr;
+import org.apache.jena.dboe.base.buffer.PtrBuffer;
+import org.apache.jena.dboe.base.buffer.RecordBuffer;
+import org.apache.jena.dboe.base.file.BufferChannel;
+import org.apache.jena.dboe.base.record.Record;
+import org.apache.jena.dboe.base.record.RecordFactory;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPage;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPageMgr;
+import org.apache.jena.dboe.trans.bplustree.*;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+public class BPlusTreeRewriter {
+ static private Logger log = LoggerFactory.getLogger(BPlusTreeRewriter.class) ;
+
+ static boolean rebalance = true ;
+ static boolean debug = false ;
+ static boolean materialize = debug ;
+
+ // Process:
+ // 1/ Take a stream of records and create leaves.
+ // Emit the RecordBufferPage (B+Tree leaves).
+ // 2/ Take a stream of RecordBufferPage and create the first level of
+ // branches.
+ // 3/ Take each branch level and create upper branches until root hit.
+ // 4/ Copy root block to root real location.
+
+ // --------------------------------
+
+ /**
+ * Given a stream of records and details of the B+Tree to build, go and
+ * build it.
+ *
+ * @return A newly built BPlusTree
+ */
+ public static BPlusTree packIntoBPlusTree(Iterator<Record> iterRecords, BPlusTreeParams bptParams, RecordFactory recordFactory,
+ BufferChannel bptState, BlockMgr blkMgrNodes, BlockMgr blkMgrRecords) {
+ // **** Attach to storage.
+ // Small caches as we mostly work on a block then move on.
+ // Only read behind and look ahead actually do any work on an existing
+ // block.
+ // (check this by rerunning with different cache sizes).
+
+ if ( !iterRecords.hasNext() )
+ // No records. Just return a B+Tree.
+ return BPlusTreeFactory.createNonTxn(bptParams, bptState, blkMgrNodes, blkMgrRecords) ;
+
+
+ // Initial B+tree needed to carry parameters around - not legal at this point.
+ BPlusTree bpt2 = BPT.createRootOnlyBPTree(bptParams, bptState, blkMgrNodes, blkMgrRecords) ;
+ // Get the root node.
+ // We will use this slot later and write in the correct root.
+ // The root has to be block zero currently.
+ BPTreeNode root = bpt2.getNodeManager().getWrite(BPlusTreeParams.RootId, BPlusTreeParams.RootParent) ;
+ // ******** Pack data blocks.
+ Iterator<Pair<Integer, Record>> iter = writePackedDataBlocks(iterRecords, bpt2) ;
+
+ // ******** Index layer
+ // Loop until one block only.
+ // Never zero blocks.
+ // Output is a single pair pointing to the root - but the root is in the
+ // wrong place.
+
+ boolean leafLayer = true ;
+ while (true) {
+ iter = genTreeLevel(iter, bpt2, leafLayer) ;
+ // Advances iter.
+ IteratorWithBuffer<Pair<Integer, Record>> iter2 = new IteratorWithBuffer<>(iter, 2) ;
+ boolean singleBlock = (iter2.peek(1) == null) ;
+ // Having peeked ahead, use the real stream.
+ iter = iter2 ;
+ if ( singleBlock )
+ break ;
+ leafLayer = false ;
+ }
+
+ // ******** Put root in right place.
+ Pair<Integer, Record> pair = iter.next() ;
+ if ( iter.hasNext() ) {
+ log.error("**** Building index layers didn't result in a single block") ;
+ return null ;
+ }
+ fixupRoot(root, pair, bpt2) ;
+ // ****** Finish the tree.
+ //bpt2.getStateManager().
+ blkMgrNodes.sync() ;
+ blkMgrRecords.sync() ;
+ return bpt2 ;
+ }
+
+ // **** data block phase
+
+ /** Pack record blocks into linked RecordBufferPages */
+ private static Iterator<Pair<Integer, Record>> writePackedDataBlocks(Iterator<Record> records, final BPlusTree bpt) {
+ if ( debug ) {
+ divider() ;
+ System.out.println("---- Data level") ;
+ }
+
+ final RecordBufferPageMgr mgr = bpt.getRecordsMgr().getRecordBufferPageMgr() ;
+ Iterator<RecordBufferPage> iter = new RecordBufferPageLinker(new RecordBufferPagePacker(records, mgr)) ;
+
+ // Write and convert to split pairs.
+ Iterator<Pair<Integer, Record>> iter2 = Iter.map(iter, rbp-> {
+ mgr.put(rbp) ;
+ Record r = rbp.getRecordBuffer().getHigh() ;
+ r = bpt.getRecordFactory().createKeyOnly(r) ;
+ return new Pair<>(rbp.getId(), r) ;
+ }) ;
+
+ if ( debug ) {
+ if ( rebalance )
+ System.out.println("Before rebalance (data)") ;
+ iter2 = summarizeDataBlocks(iter2, bpt.getRecordsMgr().getRecordBufferPageMgr()) ;
+ // iter2 = printDataBlocks(iter2,
+ // bpt.getRecordsMgr().getRecordBufferPageMgr()) ;
+ }
+
+ if ( rebalance )
+ iter2 = new RebalenceDataEnd(iter2, bpt) ;
+
+ // Testing - materialize - debug wil have done this
+ if ( materialize && !debug )
+ iter2 = Iter.toList(iter2).iterator() ;
+
+ if ( debug && rebalance ) {
+ System.out.println("After rebalance (data)") ;
+ iter2 = summarizeDataBlocks(iter2, bpt.getRecordsMgr().getRecordBufferPageMgr()) ;
+ // iter2 = printDataBlocks(iter2,
+ // bpt.getRecordsMgr().getRecordBufferPageMgr()) ;
+ }
+ return iter2 ;
+ }
+
+ private static class RebalenceDataEnd extends RebalenceBase {
+ private RecordBufferPageMgr mgr ;
+ private BPlusTree bpt ;
+
+ public RebalenceDataEnd(Iterator<Pair<Integer, Record>> iter, BPlusTree bpt) {
+ super(iter) ;
+ this.bpt = bpt ;
+ }
+
+ @Override
+ protected Record rebalance(int id1, Record r1, int id2, Record r2) {
+ RecordBufferPageMgr mgr = bpt.getRecordsMgr().getRecordBufferPageMgr() ;
+ RecordBufferPage page1 = mgr.getWrite(id1) ;
+ RecordBufferPage page2 = mgr.getWrite(id2) ;
+
+ // Wrong calculatation.
+ for ( int i = page2.getCount() ; i < page1.getMaxSize() / 2 ; i++ ) {
+ // shiftOneup(node1, node2) ;
+ Record r = page1.getRecordBuffer().getHigh() ;
+ page1.getRecordBuffer().removeTop() ;
+
+ page2.getRecordBuffer().add(0, r) ;
+ }
+
+ mgr.put(page1) ;
+ mgr.put(page2) ;
+
+ Record splitPoint = page1.getRecordBuffer().getHigh() ;
+ splitPoint = bpt.getRecordFactory().createKeyOnly(splitPoint) ;
+ // Record splitPoint = node1.maxRecord() ;
+ return splitPoint ;
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ // **** tree block phase
+
+ /*
+ * Status: need to address the parent issue - how to set the parent as we
+ * work up the BPT. One way is to work in memory and assume that there is
+ * enough room for the B+Tree nodes (not the record nodes).
+ *
+ * Another way is to rely on the caching and two-pass each new layer to fix
+ * up parents.
+ *
+ * 3rd way - parents aren't actually used in B+Trees. Set the BPT code to
+ * ignore parent of -2. Always set the new nodes to -2 for a rebuild.
+ */
+
+ // Idea a layer processor for BPT nodes has a sink that is the parent.
+ // Need to rebal the last two blocks of a level.
+ // Same for data blocks.
+
+ // ---- Block stream to BTreeNodeStream.
+
+ private static Iterator<Pair<Integer, Record>> genTreeLevel(Iterator<Pair<Integer, Record>> iter, BPlusTree bpt, boolean leafLayer) {
+ if ( debug ) {
+ divider() ;
+ System.out.println("---- Index level") ;
+ List<Pair<Integer, Record>> x = Iter.toList(iter) ;
+ System.out.println(x) ;
+ iter = x.iterator() ;
+ }
+
+ Iterator<Pair<Integer, Record>> iter2 = new BPTreeNodeBuilder(iter, bpt.getNodeManager(), leafLayer, bpt.getRecordFactory()) ;
+
+ if ( debug ) {
+ if ( rebalance )
+ System.out.println("Before rebalance (index)") ;
+ // iter2 = summarizeIndexBlocks(iter2, bpt.getNodeManager()) ;
+ iter2 = printIndexBlocks(iter2, bpt.getNodeManager()) ;
+ }
+
+ if ( rebalance )
+ iter2 = new RebalenceIndexEnd(iter2, bpt, leafLayer) ;
+
+ if ( materialize && !debug )
+ iter2 = Iter.toList(iter2).iterator() ;
+
+ if ( debug && rebalance ) {
+ System.out.println("After rebalance (index)") ;
+ // iter2 = summarizeIndexBlocks(iter2, bpt.getNodeManager()) ;
+ iter2 = printIndexBlocks(iter2, bpt.getNodeManager()) ;
+ }
+ return iter2 ;
+ }
+
+ private abstract static class RebalenceBase extends IteratorWithBuffer<Pair<Integer, Record>> {
+ protected RebalenceBase(Iterator<Pair<Integer, Record>> iter) {
+ super(iter, 2) ;
+ }
+
+ @Override
+ protected final void endReachedInner() {
+ Pair<Integer, Record> pair1 = peek(0) ;
+ Pair<Integer, Record> pair2 = peek(1) ;
+ if ( pair1 == null || pair2 == null )
+ // Insufficient blocks to repack.
+ return ;
+
+ if ( debug )
+ System.out.printf("Rebalance: %s %s\n", pair1, pair2) ;
+ Record newSplitPoint = rebalance(pair1.car(), pair1.cdr(), pair2.car(), pair2.cdr()) ;
+ // Needed??
+ if ( newSplitPoint != null ) {
+ if ( debug )
+ System.out.println("Reset split point: " + pair1.cdr() + " => " + newSplitPoint) ;
+ pair1 = new Pair<>(pair1.car(), newSplitPoint) ;
+ if ( debug )
+ System.out.printf(" %s %s\n", pair1, pair2) ;
+ set(0, pair1) ;
+ }
+ }
+
+ protected abstract Record rebalance(int id1, Record r1, int id2, Record r2) ;
+ }
+
+ private static class RebalenceIndexEnd extends RebalenceBase {
+ private BPlusTree bpt ;
+
+ public RebalenceIndexEnd(Iterator<Pair<Integer, Record>> iter, BPlusTree bpt, boolean leafLayer) {
+ super(iter) ;
+ this.bpt = bpt ;
+ }
+
+ @Override
+ protected Record rebalance(int id1, Record r1, int id2, Record r2) {
+ BPTreeNodeMgr mgr = bpt.getNodeManager() ;
+ BPTreeNode node1 = mgr.getWrite(id1) ;
+ BPTreeNode node2 = mgr.getWrite(id2) ;
+
+ // rebalence
+ // ** Need rebalance of data leaf layer.
+ int count = node2.getCount() ;
+ if ( count >= bpt.getParams().getMinRec() )
+ return null ;
+
+ Record splitPoint = r1 ;
+
+ // Shift up all in one go and use .set.
+ // Convert to block move ; should be code in BPTreeNode to do this
+ // (insert).
+ for ( int i = count ; i < bpt.getParams().getMinRec() ; i++ ) {
+
+ Record r = splitPoint ;
+
+ // shiftOneup(node1, node2) ;
+ int ptr = node1.getPtrBuffer().getHigh() ;
+ splitPoint = node1.getRecordBuffer().getHigh() ;
+
+ node1.getPtrBuffer().removeTop() ;
+ node1.getRecordBuffer().removeTop() ;
+ node1.setCount(node1.getCount() - 1) ;
+
+ node2.getPtrBuffer().add(0, ptr) ;
+ node2.getRecordBuffer().add(0, r) ;
+ node2.setCount(node2.getCount() + 1) ;
+
+ // Need high of moved substree.
+
+ if ( debug )
+ System.out.printf("-- Shift up: %d %s\n", ptr, r) ;
+ }
+ mgr.put(node1) ;
+ mgr.put(node2) ;
+
+ return splitPoint ;
+ }
+ }
+
+ private static void fixupRoot(BPTreeNode root, Pair<Integer, Record> pair, BPlusTree bpt2) {
+ root.getPtrBuffer().clear() ;
+ root.getRecordBuffer().clear() ;
+
+ if ( BPlusTreeRewriter.debug ) {
+ divider() ;
+ System.out.printf("** Process root: %s\n", pair) ;
+ }
+
+ // Node or records?
+ // BPTreeNode => BPTree copy.
+ BPTreeNode node = bpt2.getNodeManager().getRead(pair.car(), BPlusTreeParams.RootParent) ;
+ copyBPTreeNode(node, root, bpt2) ;
+ bpt2.getNodeManager().release(node) ;
+ bpt2.getNodeManager().write(root);
+ }
+
+ private static void copyBPTreeNode(BPTreeNode nodeSrc, BPTreeNode nodeDst, BPlusTree bpt2) {
+ PtrBuffer pBuff = nodeSrc.getPtrBuffer() ;
+ pBuff.copy(0, nodeDst.getPtrBuffer(), 0, pBuff.getSize()) ;
+ RecordBuffer rBuff = nodeSrc.getRecordBuffer() ;
+ rBuff.copy(0, nodeDst.getRecordBuffer(), 0, rBuff.getSize()) ;
+ nodeDst.setCount(nodeSrc.getCount()) ;
+ nodeDst.setIsLeaf(nodeSrc.isLeaf()) ;
+ bpt2.getNodeManager().put(nodeDst) ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriterUtils.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriterUtils.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriterUtils.java
new file mode 100644
index 0000000..8787cf4
--- /dev/null
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/BPlusTreeRewriterUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.trans.bplustree.rewriter ;
+
+import java.util.Iterator ;
+import java.util.List ;
+
+import org.apache.jena.atlas.iterator.Iter ;
+import org.apache.jena.atlas.lib.Pair ;
+import org.apache.jena.dboe.base.record.Record;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPage;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPageMgr;
+import org.apache.jena.dboe.trans.bplustree.BPTreeNode;
+import org.apache.jena.dboe.trans.bplustree.BPTreeNodeMgr;
+import org.apache.jena.dboe.trans.bplustree.BPlusTreeParams;
+
+class BPlusTreeRewriterUtils {
+ static String divider = "----------------------------------------" ;
+ static String nextDivider = null ;
+
+ static Iterator<Pair<Integer, Record>> summarizeDataBlocks(Iterator<Pair<Integer, Record>> iter, RecordBufferPageMgr recordPageMgr) {
+ divider() ;
+ List<Pair<Integer, Record>> pairs = Iter.toList(iter) ;
+ System.out.println("summarizeDataBlocks: " + pairs) ;
+ for ( Pair<Integer, Record> pair : pairs ) {
+ RecordBufferPage rbp = recordPageMgr.getRead(pair.car()) ;
+ System.out.printf("%s -- RecordBufferPage[id=%d,link=%d] (%d) -> [%s]\n", pair, rbp.getId(), rbp.getLink(), rbp.getCount(),
+ rbp.getRecordBuffer().getHigh()) ;
+ recordPageMgr.release(rbp) ;
+ }
+ return pairs.iterator() ;
+ }
+
+ static Iterator<Pair<Integer, Record>> summarizeIndexBlocks(Iterator<Pair<Integer, Record>> iter2, BPTreeNodeMgr bptNodeMgr) {
+ divider() ;
+ List<Pair<Integer, Record>> pairs = Iter.toList(iter2) ;
+ for ( Pair<Integer, Record> pair : pairs ) {
+ BPTreeNode bpNode = bptNodeMgr.getRead(pair.car(), BPlusTreeParams.RootParent) ;
+
+ String hr = "null" ;
+ if ( !bpNode.getRecordBuffer().isEmpty() )
+ hr = bpNode.getRecordBuffer().getHigh().toString() ;
+
+ System.out.printf("%s -- BPTreeNode: %d (%d) -> [%s]\n", pair, bpNode.getId(), bpNode.getCount(), hr) ;
+ bptNodeMgr.release(bpNode) ;
+ }
+ return pairs.iterator() ;
+ }
+
+ private static Iterator<Pair<Integer, Record>> printDataBlocks(Iterator<Pair<Integer, Record>> iter, RecordBufferPageMgr recordPageMgr) {
+ divider() ;
+ List<Pair<Integer, Record>> pairs = Iter.toList(iter) ;
+ System.out.printf(">>Packed data blocks\n") ;
+ for ( Pair<Integer, Record> pair : pairs ) {
+ System.out.printf(" %s\n", pair) ;
+ RecordBufferPage rbp = recordPageMgr.getRead(pair.car()) ;
+ // System.out.printf("RecordBufferPage[id=%d,link=%d] %d\n",
+ // rbp.getId(), rbp.getLink(), rbp.getCount() ) ;
+ System.out.println(rbp) ;
+ recordPageMgr.release(rbp) ;
+ }
+ System.out.printf("<<Packed data blocks\n") ;
+ System.out.printf("Blocks: %d\n", pairs.size()) ;
+ return pairs.iterator() ;
+ }
+
+ static Iterator<Pair<Integer, Record>> printIndexBlocks(Iterator<Pair<Integer, Record>> iter2, BPTreeNodeMgr bptNodeMgr) {
+ divider() ;
+ List<Pair<Integer, Record>> pairs = Iter.toList(iter2) ;
+ System.out.printf(">>Packed index blocks\n") ;
+ for ( Pair<Integer, Record> pair : pairs ) {
+ System.out.printf(" %s\n", pair) ;
+ BPTreeNode bpNode = bptNodeMgr.getRead(pair.car(), BPlusTreeParams.RootParent) ;
+ bpNode.setIsLeaf(true) ;
+ System.out.printf("BPTreeNode: %d\n", bpNode.getId()) ;
+ System.out.println(bpNode) ;
+ bptNodeMgr.release(bpNode) ;
+ }
+ System.out.printf("<<Packed index blocks\n") ;
+ return pairs.iterator() ;
+ }
+
+ static void divider() {
+ if ( nextDivider != null )
+ System.out.println(nextDivider) ;
+ nextDivider = divider ;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPageLinker.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPageLinker.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPageLinker.java
new file mode 100644
index 0000000..0ffb1e5
--- /dev/null
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPageLinker.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.trans.bplustree.rewriter ;
+
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+
+import org.apache.jena.atlas.iterator.PeekIterator ;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPage;
+
+/**
+ * From a stream of RecordBufferPage, manage the link fields. That is, be a one
+ * slot delay so that the "link" field can point to the next page. Be careful
+ * about the last block.
+ *
+ */
+class RecordBufferPageLinker implements Iterator<RecordBufferPage> {
+ PeekIterator<RecordBufferPage> peekIter ;
+
+ RecordBufferPage slot = null ;
+
+ RecordBufferPageLinker(Iterator<RecordBufferPage> iter) {
+ if ( !iter.hasNext() ) {
+ peekIter = null ;
+ return ;
+ }
+
+ peekIter = new PeekIterator<>(iter) ;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if ( slot != null )
+ return true ;
+
+ if ( peekIter == null )
+ return false ;
+
+ if ( !peekIter.hasNext() ) {
+ peekIter = null ;
+ return false ;
+ }
+
+ slot = peekIter.next() ;
+ RecordBufferPage nextSlot = peekIter.peek() ;
+ // If null, no slot ahead so no linkage field to set.
+ if ( nextSlot != null )
+ // Set the slot to the id of the next one
+ slot.setLink(nextSlot.getId()) ;
+ return true ;
+ }
+
+ @Override
+ public RecordBufferPage next() {
+ if ( !hasNext() )
+ throw new NoSuchElementException() ;
+ RecordBufferPage rbp = slot ;
+ slot = null ;
+ return rbp ;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException() ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPagePacker.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPagePacker.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPagePacker.java
new file mode 100644
index 0000000..84269a9
--- /dev/null
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/bplustree/rewriter/RecordBufferPagePacker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.trans.bplustree.rewriter ;
+
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+
+import org.apache.jena.dboe.base.buffer.RecordBuffer;
+import org.apache.jena.dboe.base.record.Record;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPage;
+import org.apache.jena.dboe.base.recordbuffer.RecordBufferPageMgr;
+
+/**
+ * Iterate over a stream of records, packing them into RecordBufferPage -- the
+ * leaf of a B+Tree This class does not write the blocks back to the block
+ * manager. This class does allocate block ids and blocks.
+ *
+ * @see RecordBufferPageLinker
+ */
+
+class RecordBufferPagePacker implements Iterator<RecordBufferPage> {
+ Iterator<Record> records = null ;
+ RecordBufferPage recordBufferPage = null ;
+ RecordBufferPageMgr rbMgr = null ;
+
+ RecordBufferPagePacker(Iterator<Record> records, RecordBufferPageMgr rbMgr) {
+ this.records = records ;
+ this.rbMgr = rbMgr ;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if ( recordBufferPage == null ) {
+ if ( records == null )
+ return false ;
+
+ if ( !records.hasNext() ) {
+ records = null ;
+ return false ;
+ }
+ // At least one record to be processed.
+ // No pending RecordBufferPage
+ // ==> There will be a RecordBufferPage to yield.
+
+ // int id = rbMgr.allocateId() ;
+ // //System.out.println("Allocate : "+id) ;
+ recordBufferPage = rbMgr.create() ;
+
+ RecordBuffer rb = recordBufferPage.getRecordBuffer() ;
+ while (!rb.isFull() && records.hasNext()) {
+ Record r = records.next() ;
+ rb.add(r) ;
+ }
+ if ( !records.hasNext() )
+ records = null ;
+ return true ;
+ }
+ return true ;
+
+ }
+
+ @Override
+ public RecordBufferPage next() {
+ if ( !hasNext() )
+ throw new NoSuchElementException() ;
+ RecordBufferPage rbp = recordBufferPage ;
+ recordBufferPage = null ;
+ return rbp ;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException() ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
new file mode 100644
index 0000000..1f838d2
--- /dev/null
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.trans.data;
+
+import java.nio.ByteBuffer ;
+import java.util.concurrent.atomic.AtomicLong ;
+
+import org.apache.jena.atlas.RuntimeIOException ;
+import org.apache.jena.atlas.io.IO ;
+import org.apache.jena.dboe.base.file.BinaryDataFile;
+import org.apache.jena.dboe.base.file.BufferChannel;
+import org.apache.jena.dboe.transaction.txn.*;
+import org.apache.jena.query.ReadWrite ;
+
+/** Transactional {@link BinaryDataFile}.
+ * An binary file that is append-only and allows only one writer at a time.
+ * All readers see the file up to the last commit point at the time
+ * they started. The sole writer sees more of the file.
+ */
+
+public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBinaryDataFile.TxnBinFile>
+ implements BinaryDataFile {
+
+ /*
+ * The file is written to as we go along so abort requires some action. We
+ * can't recover from just the file, without any redo or undo recovery
+ * action.
+ */
+
+ private final FileState stateMgr ;
+
+ // The current committed position and the limit as seen by readers.
+ // This is also the abort point.
+ private final AtomicLong committedLength ;
+
+ // The per thread runtime state
+ static class TxnBinFile {
+ final long length ;
+
+ TxnBinFile(long length) {
+ this.length = length ;
+ }
+ }
+
+ // Prepare record
+ static class FileState extends StateMgrData {
+ FileState(BufferChannel bufferChannel, long length, long position) {
+ super(bufferChannel, length, position) ;
+ }
+ private static int idxLength = 0 ;
+ long length() { return get(idxLength) ; }
+ void length(long len) { set(idxLength, len) ; }
+ }
+
+ private final BinaryDataFile binFile ;
+
+ /** Create a transactional BinaryDataFile over a base implementation.
+ * The base file must provide thread-safe operation.
+ */
+ public TransBinaryDataFile(BinaryDataFile binFile, ComponentId cid, BufferChannel bufferChannel) {
+ super(cid) ;
+ stateMgr = new FileState(bufferChannel, 0L, 0L) ;
+ this.binFile = binFile ;
+ if ( ! binFile.isOpen() )
+ binFile.open();
+ // Internal state may be updated by recovery. Start by
+ // setting to the "clean start" settings.
+ committedLength = new AtomicLong(binFile.length()) ;
+ }
+
+ private boolean recoveryAction = false ;
+
+ @Override
+ public void startRecovery() {
+ recoveryAction = false ;
+ }
+
+ @Override
+ public void recover(ByteBuffer ref) {
+ stateMgr.setState(ref);
+ committedLength.set(stateMgr.length()) ;
+ recoveryAction = true ;
+ }
+
+ @Override
+ public void finishRecovery() {
+ if ( recoveryAction ) {
+ long length = committedLength.get() ;
+ binFile.truncate(length) ;
+ binFile.sync();
+ committedLength.set(length) ;
+ }
+ }
+
+ @Override
+ public void cleanStart() { }
+
+ @Override
+ protected TxnBinFile _begin(ReadWrite readWrite, TxnId txnId) {
+ // Atomic read across the two because it's called from within
+ // TransactionCoordinator.begin$ where there is a lock.
+ return createState() ;
+ }
+
+ private TxnBinFile createState() {
+ long xLength = committedLength.get() ;
+ return new TxnBinFile(xLength) ;
+ }
+
+ @Override
+ protected TxnBinFile _promote(TxnId txnId, TxnBinFile state) {
+ return createState() ;
+ }
+
+ @Override
+ protected ByteBuffer _commitPrepare(TxnId txnId, TxnBinFile state) {
+ // Force to disk but do not set the on disk state to record that.
+ binFile.sync();
+ stateMgr.length(binFile.length()) ;
+ return stateMgr.getState() ;
+ }
+
+ @Override
+ protected void _commit(TxnId txnId, TxnBinFile state) {
+ if ( isWriteTxn() ) {
+ // Force to disk happens in _commitPrepare
+ stateMgr.writeState();
+ // Move visible commit point forward (not strictly necessary - transaction is ending.
+ committedLength.set(binFile.length()) ;
+ }
+ }
+
+ @Override
+ protected void _commitEnd(TxnId txnId, TxnBinFile state) {
+ }
+
+ @Override
+ protected void _abort(TxnId txnId, TxnBinFile state) {
+ if ( isWriteTxn() ) {
+ binFile.truncate(committedLength.get()) ;
+ binFile.sync() ;
+ }
+ }
+
+ @Override
+ protected void _complete(TxnId txnId, TxnBinFile state) {}
+
+ @Override
+ protected void _shutdown() {}
+
+ private void checkBoundsReader(long requestedPoint, TxnBinFile state) { }
+
+ @Override
+ public void open() {
+ if ( ! binFile.isOpen() )
+ binFile.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return binFile.isOpen() ;
+ }
+
+ @Override
+ public int read(long posn, byte[] b, int start, int length) {
+ checkTxn();
+ if ( isReadTxn() )
+ checkRead(posn) ;
+ return binFile.read(posn, b, start, length) ;
+ }
+
+ private void checkRead(long posn) {
+ if ( posn > getDataState().length )
+ IO.exception("Out of bounds: (limit "+getDataState().length+")"+posn) ;
+ }
+
+ @Override
+ public long write(byte[] b, int start, int length) {
+ checkWriteTxn() ;
+ return binFile.write(b, start, length);
+ }
+
+ /**
+ * Truncate only supported for an abort - this transactional version of
+ * BinaryDataFile will not truncate to earlier than the commited length.
+ */
+ @Override
+ public void truncate(long size) {
+ checkWriteTxn();
+ TxnBinFile state = getDataState() ;
+ if ( size < state.length )
+ throw new RuntimeIOException("truncate("+size+") to smaller than commited length "+state.length) ;
+ binFile.truncate(size) ;
+ }
+
+ @Override
+ public void sync() {
+ // No-op in transactional mode.
+ checkWriteTxn();
+ }
+
+ @Override
+ public void close() {
+ binFile.close() ;
+ }
+
+ @Override
+ public long length() {
+ super.checkTxn();
+ if ( isReadTxn() )
+ // Reader view.
+ return getDataState().length ;
+ return binFile.length() ;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ super.checkTxn();
+ return binFile.isEmpty() ;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBlob.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBlob.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBlob.java
new file mode 100644
index 0000000..f84fd16
--- /dev/null
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBlob.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.dboe.trans.data;
+
+import java.nio.ByteBuffer ;
+import java.util.concurrent.atomic.AtomicReference ;
+
+import org.apache.jena.atlas.RuntimeIOException ;
+import org.apache.jena.atlas.lib.Bytes ;
+import org.apache.jena.dboe.base.file.BufferChannel;
+import org.apache.jena.dboe.transaction.txn.ComponentId;
+import org.apache.jena.dboe.transaction.txn.TransactionalComponentLifecycle;
+import org.apache.jena.dboe.transaction.txn.TxnId;
+import org.apache.jena.query.ReadWrite ;
+
+/** Manage a single binary (not too large) object.
+ * It is written and read from a file in one action,
+ * so changes completely replace the original contents.
+ * The whole object is written to the journal during prepare.
+ */
+public class TransBlob extends TransactionalComponentLifecycle<TransBlob.BlobState> {
+
+ // The last commited state.
+ // Immutable ByteBuffer.
+ private final AtomicReference<ByteBuffer> blobRef = new AtomicReference<>() ;
+ private final BufferChannel file ;
+
+ static class BlobState {
+ boolean hasChanged = false;
+ ByteBuffer $txnBlob ;
+
+ BlobState(ByteBuffer bb) {
+ setByteBuffer(bb) ;
+ }
+ void setByteBuffer(ByteBuffer bb) {
+ $txnBlob = bb ;
+ // Could compare - seems like added complexity.
+ hasChanged = true;
+ }
+
+ ByteBuffer getByteBuffer() { return $txnBlob ; }
+ }
+
+ public TransBlob(ComponentId cid, BufferChannel file) {
+ super(cid) ;
+ this.file = file ;
+ read() ;
+ }
+
+ private void read() {
+ long x = file.size() ;
+ ByteBuffer blob = ByteBuffer.allocate((int)x) ;
+ int len = file.read(blob) ;
+ if ( len != x )
+ throw new RuntimeIOException("Short read: "+len+" of "+x) ;
+ blob.rewind() ;
+ blobRef.set(blob) ;
+ }
+
+ private void write() {
+ ByteBuffer blob = blobRef.get();
+ blob.rewind() ;
+ int x = blob.remaining() ;
+ file.truncate(0);
+ int len = file.write(blob) ;
+ if ( len != x )
+ throw new RuntimeIOException("Short write: "+len+" of "+x) ;
+ file.sync();
+ blob.rewind() ;
+ }
+
+ /** Set the byte buffer.
+ * The byte buffer should not be accessed except by {@link #getBlob}.
+ * We avoid a copy in and copy out - we trust the caller.
+ * The byte buffer should be configured for read if used with {@link #getString}.
+ */
+ public void setBlob(ByteBuffer bb) {
+ checkWriteTxn();
+ getDataState().setByteBuffer(bb);
+ }
+
+ public ByteBuffer getBlob() {
+ if ( isActiveTxn() )
+ return getDataState().getByteBuffer() ;
+ return blobRef.get() ;
+ }
+
+ /** Set data from string - convenience operation */
+ public void setString(String dataStr) {
+ checkWriteTxn();
+ if ( dataStr == null ) {
+ setBlob(null);
+ return ;
+ }
+
+ // Attempt to reuse the write-transaction byte buffer
+ // We can't reuse if it's the blobRef (shared by other transactions)
+ // but if it's a new to this write transaction buffer we can reuse.
+
+ int maxNeeded = dataStr.length()*4 ;
+ ByteBuffer bb = getDataState().getByteBuffer() ;
+ if ( bb == blobRef.get() )
+ bb = ByteBuffer.allocate(maxNeeded) ;
+ else if ( bb.capacity() >= maxNeeded )
+ bb.clear() ;
+ else
+ bb = ByteBuffer.allocate(maxNeeded) ;
+ Bytes.toByteBuffer(dataStr, bb) ;
+ bb.flip() ;
+ setBlob(bb);
+ }
+
+ /** Get data as string - convenience operation */
+ public String getString() {
+ ByteBuffer bb = getBlob() ;
+ if (bb == null )
+ return null ;
+ int x = bb.position() ;
+ String s = Bytes.fromByteBuffer(bb) ;
+ bb.position(x) ;
+ return s ;
+ }
+
+ private boolean recoveryChange = false ;
+ @Override
+ public void startRecovery() {
+ recoveryChange = false ;
+ }
+
+ @Override
+ public void recover(ByteBuffer ref) {
+ blobRef.set(ref) ;
+ recoveryChange = true ;
+ }
+
+ @Override
+ public void finishRecovery() {
+ if ( recoveryChange )
+ write() ;
+ }
+
+ @Override
+ public void cleanStart() { }
+
+ @Override
+ protected BlobState _begin(ReadWrite readWrite, TxnId txnId) {
+ return createState();
+ }
+
+ private BlobState createState() {
+ ByteBuffer blob = blobRef.get() ;
+ // Save reference to ByteBuffer into the transaction state.
+ return new BlobState(blob) ;
+ }
+
+ @Override
+ protected BlobState _promote(TxnId txnId, BlobState state) {
+ // Our write state is the read state.
+ return createState();
+ }
+
+ @Override
+ protected ByteBuffer _commitPrepare(TxnId txnId, BlobState state) {
+ if ( ! state.hasChanged )
+ return null;
+ return state.getByteBuffer() ;
+ }
+
+ @Override
+ protected void _commit(TxnId txnId, BlobState state) {
+ if ( ! state.hasChanged )
+ return;
+ // NB Change reference.
+ blobRef.set(state.getByteBuffer()) ;
+ write() ;
+ }
+
+ @Override
+ protected void _commitEnd(TxnId txnId, BlobState state) {}
+
+ @Override
+ protected void _abort(TxnId txnId, BlobState state) {}
+
+ @Override
+ protected void _complete(TxnId txnId, BlobState state) {}
+
+ @Override
+ protected void _shutdown() {}
+
+ @Override
+ public String toString() { return getComponentId().label() ; }
+
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/AccessPath.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/AccessPath.java b/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/AccessPath.java
deleted file mode 100644
index c2bd220..0000000
--- a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/AccessPath.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.trans.bplustree;
-
-import java.util.ArrayList ;
-import java.util.List ;
-import java.util.stream.Collectors ;
-
-import org.apache.jena.atlas.lib.InternalErrorException ;
-
-public class AccessPath {
- static class AccessStep {
- final BPTreeNode node ;
- final int idx ;
- final BPTreePage page ;
- AccessStep(BPTreeNode node, int idx, BPTreePage page) {
- this.node = node ;
- this.idx = idx ;
- this.page = page ;
- }
-
- @Override
- public String toString() {
- return "("+node.label()+", "+idx+")->"+page.getId() ;
- }
- }
-
- private final BPTreeNode root ;
- private List<AccessStep> traversed = new ArrayList<>() ;
-
- public AccessPath(BPTreeNode root) {
- this.root = root ;
- }
-
- public void add(BPTreeNode node, int idx, BPTreePage page) {
- traversed.add(new AccessStep(node, idx, page)) ;
- }
-
- public void reset(BPTreeNode node, int idx, BPTreePage page) {
- AccessStep s = traversed.remove(traversed.size()-1) ;
- AccessStep s2 = new AccessStep(node, idx, page) ;
- if ( s.node != s2.node )
- throw new InternalErrorException("Bad attempt to reset: "+this+" with "+s2) ;
- traversed.add(new AccessStep(node, idx, page)) ;
- }
-
- public List<AccessStep> getPath() { return traversed ; }
-
- @Override
- public String toString() {
- return traversed.stream().map(x-> x.toString()).collect(Collectors.toList()).toString() ;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPT.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPT.java b/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPT.java
deleted file mode 100644
index f6485a1..0000000
--- a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPT.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.trans.bplustree;
-
-import static java.lang.String.format ;
-import static org.apache.jena.atlas.lib.Alg.decodeIndex ;
-
-import java.util.List ;
-import java.util.Optional ;
-
-import org.apache.jena.atlas.lib.InternalErrorException ;
-import org.apache.jena.atlas.logging.FmtLog ;
-import org.seaborne.dboe.base.block.BlockMgr ;
-import org.seaborne.dboe.base.file.BufferChannel ;
-import org.seaborne.dboe.sys.SystemIndex ;
-import org.seaborne.dboe.trans.bplustree.AccessPath.AccessStep ;
-import org.slf4j.Logger ;
-
-/** B+Tree assist functions */
-public final class BPT {
- public static boolean Logging = false ; // Turn on/off logging
-
- public static boolean forcePromoteModes = false ;
- public static boolean promoteDuplicateRecords = false ;
- public static boolean promoteDuplicateNodes = false ;
-
- // Check within BPTreeNode
- public static boolean CheckingNode = false ;
- // Check on exit of B+Tree modifiying operations
- // Not done?
- public static boolean CheckingConcurrency = SystemIndex.Checking ;
-
- /** Enable detailed internal consistency checking */
- public static void checking(boolean onOrOff) {
- CheckingNode = onOrOff ;
- }
-
- /** Dump before and after top level update operations **/
- public static boolean DumpTree = false ;
-
- /** Output a lot of detailed information. */
- public static void infoAll(boolean onOrOff) {
- DumpTree = true ;
- Logging = true ;
- }
-
- static boolean logging(Logger log) {
- return Logging && log.isDebugEnabled() ;
- }
-
- static void log(Logger log, String fmt, Object... args) {
- if ( logging(log) ) {
- FmtLog.debug(log, fmt, args);
- }
- }
-
- static void warning(String msg, Object... args) {
- msg = format(msg, args) ;
- System.out.println("Warning: " + msg) ;
- System.out.flush() ;
- }
-
- static void error(String msg, Object... args) {
- msg = format(msg, args) ;
- System.out.println() ;
- System.out.println(msg) ;
- System.out.flush() ;
- throw new BPTreeException(msg) ;
- }
-
- /** Convert a find index return to the insert location in the array */
- static int apply(int idx) {
- if ( idx >= 0 )
- return idx ;
- return decodeIndex(idx) ;
- }
-
- // ---- Promotion of pages.
-
- /** Promote a single page. Assumes the path to this page has been handled in some way elsewhere */
- static boolean promote1(BPTreePage page, BPTreeNode node, int idx) {
- //System.out.println("promote1: "+page.getBlockMgr().getLabel()+" "+page.getBackingBlock().getId()) ;
- boolean changed = page.promote() ;
- node.ptrs.set(idx, page.getId()) ;
- return changed ;
- }
-
- /** Promote a B+Tree root */
- static boolean promoteRoot(BPTreeNode root) {
- if ( ! root.isRoot() )
- throw new InternalErrorException("Not a root") ;
- boolean changed = root.promote() ;
- root.bpTree.newRoot(root) ;
- return changed ;
- }
-
- /** Promote a B+Tree page */
- static void promotePage(AccessPath path, BPTreePage page) {
- Logger pageLog = page.getLogger() ;
- boolean loggingCall = logging(pageLog) ;
- if ( loggingCall )
- log(pageLog, "Promote :: Path=%s Page=%s", path, page) ;
- // ---- Checking if the access path is consistent.
- if ( BPT.CheckingNode && path != null ) {
- if ( path.getPath().size() > 2) {
- // Check every one except the last is not a leaf node.
- List<AccessStep> y = path.getPath().subList(0, path.getPath().size()-2) ;
- Optional<AccessStep> z = y.stream().filter(e -> e.node.isLeaf() ).findFirst() ;
- if ( z.isPresent() )
- error("promote: Leaf %s found in path but not at the tail: %s") ;
- }
- // Check the page/index pointers
- Optional<AccessStep> z2 = path.getPath().stream().filter(e -> e.node.ptrs.get(e.idx) != e.page.getId()).findFirst() ;
- if ( z2.isPresent() )
- error("promote: path error: %s in %s", z2.get(), path) ;
- }
-
- // ---- Clone the access path nodes.
- // Path is the route to this page - it does not include this page.
- // Work from the bottom to the top, the reverse order of AccessPath
- if ( loggingCall )
- log(pageLog, " page>> %s", page.label()) ;
- boolean changed = page.promote();
- if ( loggingCall ) {
- if ( changed )
- log(pageLog, " page<< %s", page.label()) ;
- else
- log(pageLog, " .. no change") ;
- }
-
- if ( changed )
- page.write() ; // Being careful.
-
- // Even if the page did not change, make sure the chain is dealt with.
- // e.g. promote in place policies.
- //if ( changed ) {
- if ( path != null ) {
- // Sequence of promote1 calls? + root.
-
- List<AccessStep> steps = path.getPath() ;
-
- int newPtr = page.getId() ;
- BPTreePage newPage = null ;
- boolean previousChanged = changed ;
- BPTreeNode newRoot = null ;
-
- if ( logging(pageLog) )
- log(pageLog, "Path: %s", path) ;
- // Duplicate from bottom to top.
- for ( int i = steps.size() - 1 ; i >= 0 ; i-- ) {
- AccessStep s = steps.get(i) ;
- // duplicate
- BPTreeNode n = s.node ;
- if ( logging(pageLog) )
- log(pageLog, " >> %s", n) ;
-
- changed = n.promote() ;
-
- if ( previousChanged ) {
- // Even if n did not change, if it's sub changed, need to update s.idx.
- n.ptrs.set(s.idx, newPtr) ;
- } else {
- if ( ! changed ) {
- if ( logging(pageLog) )
- log(pageLog, " .. no change") ;
- continue ;
- }
- }
-
- previousChanged = changed ;
-
- if ( logging(pageLog) )
- log(pageLog, " << %s", n) ;
-
- if ( n.isRoot() ) {
- if ( newRoot != null)
- throw new InternalErrorException("New root already found") ;
- newRoot = n ;
- }
- newPtr = n.getId() ;
- n.write() ;
-
- }
- if ( newRoot != null ) {
- if ( loggingCall )
- log(pageLog, " new root %s", newRoot) ;
- page.bpTree.newRoot(newRoot) ;
- }
- } // end of "if ( path != null )"
- }
-
- /**
- * The initial tree is a single root node with no records block below it. It
- * is an illegal tree. We make it by creating a real tree, deleting and
- * freeing the records block from the root, then resetting the records block
- * manager. This is to avoid having specialized creating code in
- * BPlusTreeFactory solely for the rewriter.
- */
- public static BPlusTree createRootOnlyBPTree(BPlusTreeParams bptParams,
- BufferChannel bptState,
- BlockMgr blkMgrNodes,
- BlockMgr blkMgrRecords) {
- BPlusTree bpt = BPlusTreeFactory.createNonTxn(bptParams, bptState, blkMgrNodes, blkMgrRecords) ;
-
- BPTreeRecordsMgr recordsMgr = bpt.getRecordsMgr() ;
-
- BPTreeRecords recordsPage = recordsMgr.getWrite(0) ;
-// recordsPage.getRecordBuffer().clear();
-// recordsMgr.write(recordsPage) ;
- recordsMgr.free(recordsPage);
-// recordsMgr.release(recordsPage);
- recordsMgr.resetAlloc(0);
-
- BPTreeNodeMgr nodeMgr = bpt.getNodeManager() ;
-
- // Alter the root node.
- BPTreeNode root = nodeMgr.getWrite(BPlusTreeParams.RootId, BPlusTreeParams.RootParent) ;
- int rootId = root.getId() ;
- if ( rootId != 0 )
- throw new BPTreeException("**** Not the root: " + rootId) ;
-
- // Undo the records block.
- root.getPtrBuffer().clear() ;
- root.getRecordBuffer().clear() ;
- //root.setCount(-1);
- nodeMgr.write(root);
- nodeMgr.release(root);
-
- // Now a broken tree of one root block and no records.
- return bpt ;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTStateMgr.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTStateMgr.java b/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTStateMgr.java
deleted file mode 100644
index 606ae9a..0000000
--- a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTStateMgr.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.trans.bplustree;
-
-import org.apache.jena.atlas.logging.FmtLog ;
-import org.seaborne.dboe.base.file.BufferChannel ;
-import org.seaborne.dboe.transaction.txn.StateMgrData ;
-import org.slf4j.Logger ;
-import org.slf4j.LoggerFactory ;
-
-/** Manage the persistent state of the tree
- *
- * This consists of last commited root, and the limits on the blocks for both
- * nodes and records.
- *
- * (rootId/int-as-long, nodeAllocLimit/long, recordsAllocLimit/long)
- */
-public class BPTStateMgr extends StateMgrData {
- private static Logger log = LoggerFactory.getLogger(BPTStateMgr.class) ;
-
- private static int idxRoot = 0 ;
- private static int idxNodeBlocksLimit = 1 ;
- private static int idxRecordsBlocksLimit = 2 ;
-
- private int currentRoot() { return (int)super.get(idxRoot) ; }
- private long nodeBlocksLimit() { return super.get(idxNodeBlocksLimit) ; }
- private long recordsBlocksLimit() { return super.get(idxRecordsBlocksLimit) ; }
-
- private void currentRoot(int x) { super.set(idxRoot, x) ; }
- private void nodeBlocksLimit(long x) { super.set(idxNodeBlocksLimit, x) ; }
- private void recordsBlocksLimit(long x) { super.set(idxRecordsBlocksLimit, x) ; }
-
- private boolean LOGGING = BPT.Logging ;
-
- public BPTStateMgr(BufferChannel storage) {
- // These values are the values for a null tree (no blocks).
- super(storage, 0L, 0L, 0L) ;
- }
-
- /*package*/ void setState(int rootIdx, long nodeBlkLimit, long recordsBlkLimit) {
- currentRoot(rootIdx) ;
- nodeBlocksLimit(nodeBlkLimit) ;
- recordsBlocksLimit(recordsBlkLimit) ;
- log("Set") ;
- setDirtyFlag() ;
- // But don't write it.
- }
- @Override
- protected void writeStateEvent() {
- log("Write") ;
- }
-
- @Override
- protected void readStateEvent() {
- log("Read") ;
- }
-
- private void log(String operation) {
- if ( LOGGING )
- FmtLog.info(log, "%s state: root=%d // node block limit = %d // records block limit %d", operation, currentRoot(), nodeBlocksLimit(), recordsBlocksLimit()) ;
- }
-
- public int getRoot() {
- return currentRoot() ;
- }
-
- public long getNodeBlocksLimit() {
- return nodeBlocksLimit() ;
- }
-
- public long getRecordsBlocksLimit() {
- return recordsBlocksLimit() ;
- }
-}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d456654/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTreeException.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTreeException.java b/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTreeException.java
deleted file mode 100644
index c7e326e..0000000
--- a/jena-db/jena-dboe-trans-data/src/main/java/org/seaborne/dboe/trans/bplustree/BPTreeException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.seaborne.dboe.trans.bplustree;
-
-import org.seaborne.dboe.DBOpEnvException ;
-
-public class BPTreeException extends DBOpEnvException
-{
- public BPTreeException() {}
- public BPTreeException(String msg) { super(msg) ; }
-}