You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:40 UTC
[09/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ProgramBlock.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ProgramBlock.java
deleted file mode 100644
index b0cb2af..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ProgramBlock.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram;
-
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.MLContextProxy;
-import com.ibm.bi.dml.hops.Hop;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.hops.recompile.Recompiler;
-import com.ibm.bi.dml.parser.StatementBlock;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLScriptException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.cp.BooleanObject;
-import com.ibm.bi.dml.runtime.instructions.cp.ComputationCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.instructions.cp.DoubleObject;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-import com.ibm.bi.dml.runtime.instructions.cp.ScalarObject;
-import com.ibm.bi.dml.runtime.instructions.cp.StringObject;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.utils.Statistics;
-import com.ibm.bi.dml.yarn.DMLAppMasterUtils;
-
-
-public class ProgramBlock
-{
-
- protected static final Log LOG = LogFactory.getLog(ProgramBlock.class.getName());
- private static final boolean CHECK_MATRIX_SPARSITY = false;
-
- protected Program _prog; // pointer to Program this ProgramBlock is part of
- protected ArrayList<Instruction> _inst;
-
- //additional attributes for recompile
- protected StatementBlock _sb = null;
- protected long _tid = 0; //by default _t0
-
-
- public ProgramBlock(Program prog)
- throws DMLRuntimeException
- {
- _prog = prog;
- _inst = new ArrayList<Instruction>();
- }
-
-
- ////////////////////////////////////////////////
- // getters, setters and similar functionality
- ////////////////////////////////////////////////
-
- public Program getProgram(){
- return _prog;
- }
-
- public void setProgram(Program prog){
- _prog = prog;
- }
-
- public StatementBlock getStatementBlock(){
- return _sb;
- }
-
- public void setStatementBlock( StatementBlock sb ){
- _sb = sb;
- }
-
- public ArrayList<Instruction> getInstructions() {
- return _inst;
- }
-
- public Instruction getInstruction(int i) {
- return _inst.get(i);
- }
-
- public void setInstructions( ArrayList<Instruction> inst ) {
- _inst = inst;
- }
-
- public void addInstruction(Instruction inst) {
- _inst.add(inst);
- }
-
- public void addInstructions(ArrayList<Instruction> inst) {
- _inst.addAll(inst);
- }
-
- public int getNumInstructions() {
- return _inst.size();
- }
-
- public void setThreadID( long id ){
- _tid = id;
- }
-
-
- //////////////////////////////////////////////////////////
- // core instruction execution (program block, predicate)
- //////////////////////////////////////////////////////////
-
- /**
- * Executes this program block (incl recompilation if required).
- *
- * @param ec
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public void execute(ExecutionContext ec)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- ArrayList<Instruction> tmp = _inst;
-
- //dynamically recompile instructions if enabled and required
- try
- {
- if( DMLScript.isActiveAM() ) //set program block specific remote memory
- DMLAppMasterUtils.setupProgramBlockRemoteMaxMemory(this);
-
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- if( OptimizerUtils.ALLOW_DYN_RECOMPILATION
- && _sb != null
- && _sb.requiresRecompilation() )
- {
- tmp = Recompiler.recompileHopsDag(_sb, _sb.get_hops(), ec.getVariables(), null, false, _tid);
-
- if( MLContextProxy.isActive() )
- tmp = MLContextProxy.performCleanupAfterRecompilation(tmp);
- }
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- Statistics.incrementHOPRecompileTime(t1-t0);
- if( tmp!=_inst )
- Statistics.incrementHOPRecompileSB();
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to recompile program block.", ex);
- }
-
- //actual instruction execution
- executeInstructions(tmp, ec);
- }
-
- /**
- * Executes given predicate instructions (incl recompilation if required)
- *
- * @param inst
- * @param hops
- * @param ec
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public ScalarObject executePredicate(ArrayList<Instruction> inst, Hop hops, boolean requiresRecompile, ValueType retType, ExecutionContext ec)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- ArrayList<Instruction> tmp = inst;
-
- //dynamically recompile instructions if enabled and required
- try {
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- if( OptimizerUtils.ALLOW_DYN_RECOMPILATION
- && requiresRecompile )
- {
- tmp = Recompiler.recompileHopsDag(hops, ec.getVariables(), null, false, _tid);
- }
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- Statistics.incrementHOPRecompileTime(t1-t0);
- if( tmp!=inst )
- Statistics.incrementHOPRecompilePred();
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to recompile predicate instructions.", ex);
- }
-
- //actual instruction execution
- return executePredicateInstructions(tmp, retType, ec);
- }
-
- /**
- *
- * @param inst
- * @param ec
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- protected void executeInstructions(ArrayList<Instruction> inst, ExecutionContext ec)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- for (int i = 0; i < inst.size(); i++)
- {
- //indexed access required due to dynamic add
- Instruction currInst = inst.get(i);
-
- //execute instruction
- ec.updateDebugState(i);
- executeSingleInstruction(currInst, ec);
- }
- }
-
- /**
- *
- * @param inst
- * @param ec
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- protected ScalarObject executePredicateInstructions(ArrayList<Instruction> inst, ValueType retType, ExecutionContext ec)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- ScalarObject ret = null;
- String retName = null;
-
- //execute all instructions
- for (int i = 0; i < inst.size(); i++)
- {
- //indexed access required due to debug mode
- Instruction currInst = inst.get(i);
- if( !isRemoveVariableInstruction(currInst) )
- {
- //execute instruction
- ec.updateDebugState(i);
- executeSingleInstruction(currInst, ec);
-
- //get last return name
- if(currInst instanceof ComputationCPInstruction )
- retName = ((ComputationCPInstruction) currInst).getOutputVariableName();
- else if(currInst instanceof VariableCPInstruction && ((VariableCPInstruction)currInst).getOutputVariableName()!=null)
- retName = ((VariableCPInstruction)currInst).getOutputVariableName();
- }
- }
-
- //get return value TODO: how do we differentiate literals and variables?
- ret = (ScalarObject) ec.getScalarInput(retName, retType, false);
-
- //execute rmvar instructions
- for (int i = 0; i < inst.size(); i++) {
- //indexed access required due to debug mode
- Instruction currInst = inst.get(i);
- if( isRemoveVariableInstruction(currInst) ) {
- ec.updateDebugState(i);
- executeSingleInstruction(currInst, ec);
- }
- }
-
- //check and correct scalar ret type (incl save double to int)
- if( ret.getValueType() != retType )
- switch( retType ) {
- case BOOLEAN: ret = new BooleanObject(ret.getName(),ret.getBooleanValue()); break;
- case INT: ret = new IntObject(ret.getName(),ret.getLongValue()); break;
- case DOUBLE: ret = new DoubleObject(ret.getName(),ret.getDoubleValue()); break;
- case STRING: ret = new StringObject(ret.getName(),ret.getStringValue()); break;
- default:
- //do nothing
- }
-
- return ret;
- }
-
- /**
- *
- *
- * @param currInst
- * @throws DMLRuntimeException
- */
- private void executeSingleInstruction( Instruction currInst, ExecutionContext ec )
- throws DMLRuntimeException
- {
- try
- {
- // start time measurement for statistics
- long t0 = (DMLScript.STATISTICS || LOG.isTraceEnabled()) ?
- System.nanoTime() : 0;
-
- // pre-process instruction (debug state, inst patching, listeners)
- Instruction tmp = currInst.preprocessInstruction( ec );
-
- // process actual instruction
- tmp.processInstruction( ec );
-
- // post-process instruction (debug)
- tmp.postprocessInstruction( ec );
-
- // maintain aggregate statistics
- if( DMLScript.STATISTICS) {
- Statistics.maintainCPHeavyHitters(
- tmp.getExtendedOpcode(), System.nanoTime()-t0);
- }
-
- // optional trace information (instruction and runtime)
- if( LOG.isTraceEnabled() ) {
- long t1 = System.nanoTime();
- String time = String.format("%.3f",((double)t1-t0)/1000000000);
- LOG.trace("Instruction: "+ tmp + " (executed in " + time + "s).");
- }
-
- // optional check for correct nnz and sparse/dense representation of all
- // variables in symbol table (for tracking source of wrong representation)
- if( CHECK_MATRIX_SPARSITY ) {
- checkSparsity( tmp, ec.getVariables() );
- }
- }
- catch (Exception e)
- {
- if (!DMLScript.ENABLE_DEBUG_MODE) {
- if ( e instanceof DMLScriptException)
- throw (DMLScriptException)e;
- else
- throw new DMLRuntimeException(this.printBlockErrorLocation() + "Error evaluating instruction: " + currInst.toString() , e);
- }
- else {
- ec.handleDebugException(e);
- }
- }
- }
-
- /**
- *
- * @param inst
- * @return
- */
- private boolean isRemoveVariableInstruction(Instruction inst)
- {
- return ( inst instanceof VariableCPInstruction && ((VariableCPInstruction)inst).isRemoveVariable() );
- }
-
- public void printMe() {
- //System.out.println("***** INSTRUCTION BLOCK *****");
- for (Instruction i : this._inst) {
- i.printMe();
- }
- }
-
- /**
- *
- * @param lastInst
- * @param vars
- * @throws DMLRuntimeException
- */
- private void checkSparsity( Instruction lastInst, LocalVariableMap vars )
- throws DMLRuntimeException
- {
- for( String varname : vars.keySet() )
- {
- Data dat = vars.get(varname);
- if( dat instanceof MatrixObject )
- {
- MatrixObject mo = (MatrixObject)dat;
- if( mo.isDirty() && !mo.isPartitioned() )
- {
- MatrixBlock mb = mo.acquireRead();
- boolean sparse1 = mb.isInSparseFormat();
- long nnz1 = mb.getNonZeros();
- synchronized( mb ) { //potential state change
- mb.recomputeNonZeros();
- mb.examSparsity();
- }
- boolean sparse2 = mb.isInSparseFormat();
- long nnz2 = mb.getNonZeros();
- mo.release();
-
- if( nnz1 != nnz2 )
- throw new DMLRuntimeException("Matrix nnz meta data was incorrect: ("+varname+", actual="+nnz1+", expected="+nnz2+", inst="+lastInst+")");
-
-
- if( sparse1 != sparse2 )
- throw new DMLRuntimeException("Matrix was in wrong data representation: ("+varname+", actual="+sparse1+", expected="+sparse2+", nnz="+nnz1+", inst="+lastInst+")");
- }
- }
- }
- }
-
- ///////////////////////////////////////////////////////////////////////////
- // store position information for program blocks
- ///////////////////////////////////////////////////////////////////////////
-
- public int _beginLine, _beginColumn;
- public int _endLine, _endColumn;
-
- public void setBeginLine(int passed) { _beginLine = passed; }
- public void setBeginColumn(int passed) { _beginColumn = passed; }
- public void setEndLine(int passed) { _endLine = passed; }
- public void setEndColumn(int passed) { _endColumn = passed; }
-
- public void setAllPositions(int blp, int bcp, int elp, int ecp){
- _beginLine = blp;
- _beginColumn = bcp;
- _endLine = elp;
- _endColumn = ecp;
- }
-
- public int getBeginLine() { return _beginLine; }
- public int getBeginColumn() { return _beginColumn; }
- public int getEndLine() { return _endLine; }
- public int getEndColumn() { return _endColumn; }
-
- public String printErrorLocation(){
- return "ERROR: line " + _beginLine + ", column " + _beginColumn + " -- ";
- }
-
- public String printBlockErrorLocation(){
- return "ERROR: Runtime error in program block generated from statement block between lines " + _beginLine + " and " + _endLine + " -- ";
- }
-
- public String printWarningLocation(){
- return "WARNING: line " + _beginLine + ", column " + _beginColumn + " -- ";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/WhileProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/WhileProgramBlock.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/WhileProgramBlock.java
deleted file mode 100644
index cea071c..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/WhileProgramBlock.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram;
-
-import java.util.ArrayList;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.hops.Hop;
-import com.ibm.bi.dml.parser.WhileStatementBlock;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLScriptException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.Instruction.INSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.instructions.cp.BooleanObject;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.ComputationCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.instructions.cp.ScalarObject;
-import com.ibm.bi.dml.runtime.instructions.cp.StringObject;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction.CPINSTRUCTION_TYPE;
-import com.ibm.bi.dml.yarn.DMLAppMasterUtils;
-
-
-public class WhileProgramBlock extends ProgramBlock
-{
-
- private ArrayList<Instruction> _predicate;
- private String _predicateResultVar;
- private ArrayList <Instruction> _exitInstructions ;
- private ArrayList<ProgramBlock> _childBlocks;
-
- public WhileProgramBlock(Program prog, ArrayList<Instruction> predicate) throws DMLRuntimeException{
- super(prog);
- _predicate = predicate;
- _predicateResultVar = findPredicateResultVar ();
- _exitInstructions = new ArrayList<Instruction>();
- _childBlocks = new ArrayList<ProgramBlock>();
- }
-
- public void printMe() {
-
- LOG.debug("***** while current block predicate inst: *****");
- for (Instruction cp : _predicate){
- cp.printMe();
- }
-
- for (ProgramBlock pb : this._childBlocks){
- pb.printMe();
- }
-
- LOG.debug("***** current block inst exit: *****");
- for (Instruction i : this._exitInstructions) {
- i.printMe();
- }
- }
-
-
-
-
- public void addProgramBlock(ProgramBlock childBlock) {
- _childBlocks.add(childBlock);
- }
-
- public void setExitInstructions2(ArrayList<Instruction> exitInstructions)
- { _exitInstructions = exitInstructions; }
-
- public void setExitInstructions1(ArrayList<Instruction> predicate)
- { _predicate = predicate; }
-
- public void addExitInstruction(Instruction inst)
- { _exitInstructions.add(inst); }
-
- public ArrayList<Instruction> getPredicate()
- { return _predicate; }
-
- public void setPredicate( ArrayList<Instruction> predicate )
- {
- _predicate = predicate;
-
- //update result var if non-empty predicate (otherwise,
- //do not overwrite varname predicate in predicateResultVar)
- if( _predicate != null && !_predicate.isEmpty() )
- _predicateResultVar = findPredicateResultVar();
- }
-
- public String getPredicateResultVar()
- { return _predicateResultVar; }
-
- public void setPredicateResultVar(String resultVar)
- { _predicateResultVar = resultVar; }
-
- public ArrayList<Instruction> getExitInstructions()
- { return _exitInstructions; }
-
- private BooleanObject executePredicate(ExecutionContext ec)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- BooleanObject result = null;
- try
- {
- if( _predicate!=null && !_predicate.isEmpty() )
- {
- if( _sb!=null )
- {
- if( DMLScript.isActiveAM() ) //set program block specific remote memory
- DMLAppMasterUtils.setupProgramBlockRemoteMaxMemory(this);
-
- WhileStatementBlock wsb = (WhileStatementBlock)_sb;
- Hop predicateOp = wsb.getPredicateHops();
- boolean recompile = wsb.requiresPredicateRecompilation();
- result = (BooleanObject) executePredicate(_predicate, predicateOp, recompile, ValueType.BOOLEAN, ec);
- }
- else
- result = (BooleanObject) executePredicate(_predicate, null, false, ValueType.BOOLEAN, ec);
- }
- else
- {
- //get result var
- ScalarObject scalarResult = null;
- Data resultData = ec.getVariable(_predicateResultVar);
- if ( resultData == null ) {
- //note: resultvar is a literal (can it be of any value type other than String, hence no literal/varname conflict)
- scalarResult = ec.getScalarInput(_predicateResultVar, ValueType.BOOLEAN, true);
- }
- else {
- scalarResult = ec.getScalarInput(_predicateResultVar, ValueType.BOOLEAN, false);
- }
-
- //check for invalid type String
- if (scalarResult instanceof StringObject)
- throw new DMLRuntimeException(this.printBlockErrorLocation() + "\nWhile predicate variable "+ _predicateResultVar + " evaluated to string " + scalarResult + " which is not allowed for predicates in DML");
-
- //process result
- if( scalarResult instanceof BooleanObject )
- result = (BooleanObject)scalarResult;
- else
- result = new BooleanObject( scalarResult.getBooleanValue() ); //auto casting
- }
- }
- catch(Exception ex)
- {
- LOG.trace("\nWhile predicate variables: "+ ec.getVariables().toString());
- throw new DMLRuntimeException(this.printBlockErrorLocation() + "Failed to evaluate the WHILE predicate.", ex);
- }
-
- //(guaranteed to be non-null, see executePredicate/getScalarInput)
- return result;
- }
-
- public void execute(ExecutionContext ec) throws DMLRuntimeException, DMLUnsupportedOperationException {
-
- BooleanObject predResult = executePredicate(ec);
-
- //execute while loop
- try
- {
- while(predResult.getBooleanValue())
- {
- //execute all child blocks
- for (int i=0 ; i < _childBlocks.size() ; i++) {
- ec.updateDebugState(i);
- _childBlocks.get(i).execute(ec);
- }
-
- predResult = executePredicate(ec);
- }
- }
- catch(DMLScriptException e)
- {
- throw e;
- }
- catch(Exception e)
- {
- LOG.trace("\nWhile predicate variables: "+ ec.getVariables().toString());
- throw new DMLRuntimeException(this.printBlockErrorLocation() + "Error evaluating while program block.", e);
- }
-
- //execute exit instructions
- try {
- executeInstructions(_exitInstructions, ec);
- }
- catch(Exception e)
- {
- throw new DMLRuntimeException(this.printBlockErrorLocation() + "Error executing while exit instructions.", e);
- }
- }
-
- public ArrayList<ProgramBlock> getChildBlocks() {
- return _childBlocks;
- }
-
- public void setChildBlocks(ArrayList<ProgramBlock> childs)
- {
- _childBlocks = childs;
- }
-
- private String findPredicateResultVar ( ) {
- String result = null;
- for ( Instruction si : _predicate ) {
- if ( si.getType() == INSTRUCTION_TYPE.CONTROL_PROGRAM && ((CPInstruction)si).getCPInstructionType() != CPINSTRUCTION_TYPE.Variable ) {
- result = ((ComputationCPInstruction) si).getOutputVariableName();
- }
- else if(si instanceof VariableCPInstruction && ((VariableCPInstruction)si).isVariableCastInstruction()){
- result = ((VariableCPInstruction)si).getOutputVariableName();
- }
- }
-
- return result;
- }
-
- public String printBlockErrorLocation(){
- return "ERROR: Runtime error in while program block generated from while statement block between lines " + _beginLine + " and " + _endLine + " -- ";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/ByteBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/ByteBuffer.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/ByteBuffer.java
deleted file mode 100644
index 97b6a18..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/ByteBuffer.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-
-/**
- * Wrapper for WriteBuffer byte array per matrix in order to
- * support matrix serialization outside global lock.
- *
- */
-public class ByteBuffer
-{
- private boolean _serialized;
- private boolean _sparse;
- private long _size;
-
- protected byte[] _bdata = null; //sparse matrix
- protected MatrixBlock _mdata = null; //dense matrix
-
- public ByteBuffer( long size )
- {
- _size = size;
- _serialized = false;
- }
-
- /**
- *
- * @param mb
- * @throws IOException
- */
- public void serializeMatrix( MatrixBlock mb )
- throws IOException
- {
- boolean sparseSrc = mb.isInSparseFormat(); //current representation
- boolean sparseTrgt = mb.evalSparseFormatOnDisk(); //intended target representation
- _sparse = sparseTrgt;
-
- try
- {
- if( _sparse ) //SPARSE/DENSE -> SPARSE
- {
- //deep serialize (for compression)
- if( CacheableData.CACHING_BUFFER_PAGECACHE )
- _bdata = PageCache.getPage((int)_size);
- if( _bdata==null )
- _bdata = new byte[(int)_size];
- DataOutput dout = new CacheDataOutput(_bdata);
- mb.write(dout);
- }
- else //SPARSE/DENSE -> DENSE
- {
- //change representation (if required), incl. free sparse
- //(in-memory representation, if dense on disk than if will
- //be guaranteed to be dense in memory as well)
- if( sparseSrc )
- mb.examSparsity();
-
- //shallow serialize
- _mdata = mb;
- }
- }
- catch(Exception ex)
- {
- throw new IOException("Failed to serialize matrix block.", ex);
- }
-
- _serialized = true;
- }
-
- /**
- *
- * @return
- * @throws IOException
- */
- public MatrixBlock deserializeMatrix()
- throws IOException
- {
- MatrixBlock ret = null;
-
- if( _sparse )
- {
- //ByteArrayInputStream bis = new ByteArrayInputStream(_bdata);
- //DataInputStream din = new DataInputStream(bis);
- CacheDataInput din = new CacheDataInput(_bdata);
- ret = new MatrixBlock();
- ret.readFields(din);
- }
- else
- {
- ret = _mdata;
- }
-
- return ret;
- }
-
- /**
- *
- * @param fname
- * @throws IOException
- */
- public void evictBuffer( String fname )
- throws IOException
- {
- if( _sparse )
- {
- //write out byte serialized array
- LocalFileUtils.writeByteArrayToLocal(fname, _bdata);
- }
- else
- {
- //serialize matrix to output stream
- LocalFileUtils.writeMatrixBlockToLocal(fname, _mdata);
- }
- }
-
- /**
- * Returns the buffer size in bytes.
- *
- * @return
- */
- public long getSize()
- {
- return _size;
- }
-
- /**
- *
- * @return
- */
- public boolean isInSparseFormat()
- {
- return _sparse;
- }
-
- public void freeMemory()
- {
- //clear strong references to buffer/matrix
- if( _sparse )
- {
- if( CacheableData.CACHING_BUFFER_PAGECACHE )
- PageCache.putPage(_bdata);
- _bdata = null;
- }
- else
- {
- _mdata = null;
- }
- }
-
- /**
- *
- */
- public void checkSerialized()
- {
- if( _serialized )
- return;
-
- while( !_serialized )
- {
- try{Thread.sleep(1);} catch(Exception e) {}
- }
- }
-
- /**
- * Determines if byte buffer can hold the given size given this specific matrix block.
- * This call is consistent with 'serializeMatrix' and allows for internal optimization
- * according to dense/sparse representation.
- *
- * @param size
- * @param mb
- * @return
- */
- public static boolean isValidCapacity( long size, MatrixBlock mb )
- {
- boolean sparseTrgt = mb.evalSparseFormatOnDisk(); //intended target representation
-
- if( sparseTrgt ) //SPARSE
- {
- // since sparse matrix blocks are serialized into a byte representation
- // the buffer buffer can hold at most 2GB in size
- return ( size <= Integer.MAX_VALUE );
- }
- else //DENSE
- {
- // since for dense matrix blocks we use a shallow serialize (strong reference),
- // the byte buffer can hold any size (currently upper bounded by 16GB)
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataInput.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataInput.java
deleted file mode 100644
index f2802a3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataInput.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlockDataInput;
-import com.ibm.bi.dml.runtime.matrix.data.SparseRow;
-
-public class CacheDataInput implements DataInput, MatrixBlockDataInput
-{
-
- protected byte[] _buff;
- protected int _bufflen;
- protected int _count;
-
- public CacheDataInput( byte[] mem )
- {
- _buff = mem;
- _bufflen = _buff.length;
- _count = 0;
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public boolean readBoolean()
- throws IOException
- {
- //mask to adhere to the input stream semantic
- return ( (_buff[_count++] & 0xFF) != 0 );
- }
-
- @Override
- public byte readByte()
- throws IOException
- {
- //mask to adhere to the input stream semantic
- return (byte) (_buff[_count++] & 0xFF);
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public short readShort() throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public char readChar() throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public int readInt()
- throws IOException
- {
- int ret = baToInt(_buff, _count);
- _count += 4;
-
- return ret;
- }
-
- @Override
- public long readLong()
- throws IOException
- {
- long ret = baToLong(_buff, _count);
- _count += 8;
-
- return ret;
- }
-
- @Override
- public float readFloat() throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public double readDouble()
- throws IOException
- {
- long tmp = baToLong(_buff, _count);
- double tmp2 = Double.longBitsToDouble(tmp);
- _count += 8;
-
- return tmp2;
- }
-
- @Override
- public String readLine() throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public String readUTF() throws IOException {
- throw new IOException("Not supported.");
- }
-
- ///////////////////////////////////////////////
- // Implementation of MatrixBlockDSMDataOutput
- ///////////////////////////////////////////////
-
- @Override
- public long readDoubleArray(int len, double[] varr)
- throws IOException
- {
- //counter for non-zero elements
- long nnz = 0;
-
- int off = _count;
- for( int i=0; i<len; i++ )
- {
- //core deserialization
- long tmp = baToLong(_buff, off+i*8);
- varr[i] = Double.longBitsToDouble( tmp );
-
- //nnz maintenance
- nnz += (varr[i]!=0) ? 1 : 0;
- }
- _count = off + len*8;
-
- return nnz;
- }
-
- @Override
- public long readSparseRows(int rlen, SparseRow[] rows)
- throws IOException
- {
- //counter for non-zero elements
- long nnz = 0;
-
- //read all individual sparse rows from input
- for( int i=0; i<rlen; i++ )
- {
- int lnnz = readInt();
-
- if( lnnz > 0 ) //non-zero row
- {
- //get handle to sparse (allocate if necessary)
- if( rows[i] == null )
- rows[i] = new SparseRow(lnnz);
- SparseRow arow = rows[i];
-
- //read single sparse row
- for( int j=0; j<lnnz; j++ )
- {
- int aix = baToInt(_buff, _count);
- long tmp = baToLong(_buff, _count+4);
- double aval = Double.longBitsToDouble( tmp );
- arow.append(aix, aval);
- _count+=12;
- }
-
- nnz += lnnz;
- }
- }
-
- return nnz;
- }
-
- /**
- *
- * @param a
- * @param off
- * @return
- */
- private static int baToInt( byte[] ba, final int off )
- {
- //shift and add 4 bytes into single int
- return ((ba[off+0] & 0xFF) << 24) +
- ((ba[off+1] & 0xFF) << 16) +
- ((ba[off+2] & 0xFF) << 8) +
- ((ba[off+3] & 0xFF) << 0);
- }
-
- /**
- *
- * @param a
- * @param off
- * @return
- */
- private static long baToLong( byte[] ba, final int off )
- {
- //shift and add 8 bytes into single long
- return ((long)(ba[off+0] & 0xFF) << 56) +
- ((long)(ba[off+1] & 0xFF) << 48) +
- ((long)(ba[off+2] & 0xFF) << 40) +
- ((long)(ba[off+3] & 0xFF) << 32) +
- ((long)(ba[off+4] & 0xFF) << 24) +
- ((long)(ba[off+5] & 0xFF) << 16) +
- ((long)(ba[off+6] & 0xFF) << 8) +
- ((long)(ba[off+7] & 0xFF) << 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataOutput.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataOutput.java
deleted file mode 100644
index d821319..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheDataOutput.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlockDataOutput;
-import com.ibm.bi.dml.runtime.matrix.data.SparseRow;
-
-/**
- * Customer DataOutput to serialize directly into the given byte array.
- *
- *
- */
-public class CacheDataOutput implements DataOutput, MatrixBlockDataOutput
-{
-
- protected byte[] _buff;
- protected int _bufflen;
- protected int _count;
-
- public CacheDataOutput( byte[] mem )
- {
- _buff = mem;
- _bufflen = _buff.length;
- _count = 0;
- }
-
- @Override
- public void write(int b)
- throws IOException
- {
- _buff[_count++] = (byte)b;
- }
-
- @Override
- public void write(byte[] b)
- throws IOException
- {
- System.arraycopy(b, 0, _buff, _count, b.length);
- _count += b.length;
- }
-
- @Override
- public void write(byte[] b, int off, int len)
- throws IOException
- {
- System.arraycopy(b, off, _buff, _count, len);
- _count += len;
- }
-
- @Override
- public void writeBoolean(boolean v)
- throws IOException
- {
- _buff[_count++] = (byte)( v ? 1 : 0 );
- }
-
-
- @Override
- public void writeInt(int v)
- throws IOException
- {
- intToBa(v, _buff, _count);
- _count += 4;
- }
-
- @Override
- public void writeDouble(double v)
- throws IOException
- {
- long tmp = Double.doubleToRawLongBits(v);
- longToBa(tmp, _buff, _count);
- _count += 8;
- }
-
- @Override
- public void writeByte(int v) throws IOException {
- _buff[_count++] = (byte) v;
- }
-
- @Override
- public void writeBytes(String s) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void writeChar(int v) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void writeChars(String s) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void writeFloat(float v) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void writeLong(long v) throws IOException {
- longToBa(v, _buff, _count);
- _count += 8;
- }
-
- @Override
- public void writeShort(int v) throws IOException {
- throw new IOException("Not supported.");
- }
-
- @Override
- public void writeUTF(String s) throws IOException {
- throw new IOException("Not supported.");
- }
-
-
- ///////////////////////////////////////////////
- // Implementation of MatrixBlockDSMDataOutput
- ///////////////////////////////////////////////
-
- @Override
- public void writeDoubleArray(int len, double[] varr)
- throws IOException
- {
- //original buffer offset
- int off = _count;
-
- //serialize entire array into buffer
- for( int i=0; i<len; i++ )
- {
- long tmp = Double.doubleToRawLongBits(varr[i]);
- longToBa(tmp, _buff, off+i*8);
- }
-
- //update buffer offset
- _count = off + len*8;
- }
-
- @Override
- public void writeSparseRows(int rlen, SparseRow[] rows)
- throws IOException
- {
- int lrlen = Math.min(rows.length, rlen);
-
- //process existing rows
- for( int i=0; i<lrlen; i++ )
- {
- SparseRow arow = rows[i];
- if( arow!=null && !arow.isEmpty() )
- {
- int alen = arow.size();
- int[] aix = arow.getIndexContainer();
- double[] avals = arow.getValueContainer();
-
- writeInt( alen );
-
- for( int j=0; j<alen; j++ )
- {
- intToBa(aix[j], _buff, _count);
- long tmp2 = Double.doubleToRawLongBits(avals[j]);
- longToBa(tmp2, _buff, _count+4);
- _count += 12;
- }
- }
- else
- writeInt( 0 );
- }
-
- //process remaining empty rows
- for( int i=lrlen; i<rlen; i++ )
- writeInt( 0 );
- }
-
- /**
- *
- * @param val
- * @param ba
- * @param off
- */
- private static void intToBa( final int val, byte[] ba, final int off )
- {
- //shift and mask out 4 bytes
- ba[ off+0 ] = (byte)((val >>> 24) & 0xFF);
- ba[ off+1 ] = (byte)((val >>> 16) & 0xFF);
- ba[ off+2 ] = (byte)((val >>> 8) & 0xFF);
- ba[ off+3 ] = (byte)((val >>> 0) & 0xFF);
- }
-
- /**
- *
- * @param val
- * @param ba
- * @param off
- */
- private static void longToBa( final long val, byte[] ba, final int off )
- {
- //shift and mask out 8 bytes
- ba[ off+0 ] = (byte)((val >>> 56) & 0xFF);
- ba[ off+1 ] = (byte)((val >>> 48) & 0xFF);
- ba[ off+2 ] = (byte)((val >>> 40) & 0xFF);
- ba[ off+3 ] = (byte)((val >>> 32) & 0xFF);
- ba[ off+4 ] = (byte)((val >>> 24) & 0xFF);
- ba[ off+5 ] = (byte)((val >>> 16) & 0xFF);
- ba[ off+6 ] = (byte)((val >>> 8) & 0xFF);
- ba[ off+7 ] = (byte)((val >>> 0) & 0xFF);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheException.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheException.java
deleted file mode 100644
index 3516b8b..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-
-public class CacheException extends DMLRuntimeException
-{
-
-
- private static final long serialVersionUID = 1L;
-
- public CacheException ()
- {
- super ("Cache Exception");
- }
-
- public CacheException (String message)
- {
- super (message);
- }
-
- public CacheException (Exception cause)
- {
- super (cause);
- }
-
- public CacheException (String message, Exception cause)
- {
- super (message, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheIOException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheIOException.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheIOException.java
deleted file mode 100644
index 6efda8d..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheIOException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-
-public class CacheIOException extends CacheException
-{
-
-
- private static final long serialVersionUID = 1L;
-
- public CacheIOException ()
- {
- super ();
- }
-
- public CacheIOException (String message)
- {
- super (message);
- }
-
- public CacheIOException (Exception cause)
- {
- super (cause);
- }
-
- public CacheIOException (String message, Exception cause)
- {
- super (message, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatistics.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatistics.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatistics.java
deleted file mode 100644
index e12b586..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatistics.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This singleton provides basic caching statistics in CP.
- *
- * 1) Hit statistics for caching (mem, fs, hdfs, total)
- *
- * NOTE: In order to provide accurate statistics in multi-threaded
- * synchronized increments are required. Since those functions are
- * called potentially very often, we use atomic increments
- * (compare and swap) instead of heavy-weight 'synchronized' methods.
- *
- */
-public class CacheStatistics
-{
-
- //enum used for MR counters
- public enum Stat {
- CACHE_HITS_MEM,
- CACHE_HITS_FSBUFF,
- CACHE_HITS_FS,
- CACHE_HITS_HDFS,
- CACHE_WRITES_FSBUFF,
- CACHE_WRITES_FS,
- CACHE_WRITES_HDFS,
- CACHE_TIME_ACQR, //acquire read
- CACHE_TIME_ACQM, //acquire read
- CACHE_TIME_RLS, //release
- CACHE_TIME_EXP, //export
- }
-
- //hit statistics (for acquire read)
- private static AtomicLong _numHitsTotal = null;
- private static AtomicLong _numHitsMem = null;
- private static AtomicLong _numHitsFSBuff = null;
- private static AtomicLong _numHitsFS = null;
- private static AtomicLong _numHitsHDFS = null;
-
- //write statistics caching
- private static AtomicLong _numWritesFSBuff = null;
- private static AtomicLong _numWritesFS = null;
- private static AtomicLong _numWritesHDFS = null;
-
- //time statistics caching
- private static AtomicLong _ctimeAcquireR = null; //in nano sec
- private static AtomicLong _ctimeAcquireM = null; //in nano sec
- private static AtomicLong _ctimeRelease = null; //in nano sec
- private static AtomicLong _ctimeExport = null; //in nano sec
-
- static
- {
- reset();
- }
-
- public static void reset()
- {
- _numHitsTotal = new AtomicLong(0);
- _numHitsMem = new AtomicLong(0);
- _numHitsFSBuff = new AtomicLong(0);
- _numHitsFS = new AtomicLong(0);
- _numHitsHDFS = new AtomicLong(0);
-
- _numWritesFSBuff = new AtomicLong(0);
- _numWritesFS = new AtomicLong(0);
- _numWritesHDFS = new AtomicLong(0);
-
- _ctimeAcquireR = new AtomicLong(0);
- _ctimeAcquireM = new AtomicLong(0);
- _ctimeRelease = new AtomicLong(0);
- _ctimeExport = new AtomicLong(0);
- }
-
- public static void incrementTotalHits()
- {
- _numHitsTotal.incrementAndGet();
- }
-
- public static void incrementTotalHits(int delta)
- {
- _numHitsTotal.addAndGet(delta);
- }
-
- public static long getTotalHits()
- {
- return _numHitsTotal.get();
- }
-
- public static void incrementMemHits()
- {
- _numHitsMem.incrementAndGet();
- }
-
- public static void incrementMemHits(int delta)
- {
- _numHitsMem.addAndGet(delta);
- }
-
- public static long getMemHits()
- {
- return _numHitsMem.get();
- }
-
- public static void incrementFSBuffHits()
- {
- _numHitsFSBuff.incrementAndGet();
- }
-
- public static void incrementFSBuffHits( int delta )
- {
- _numHitsFSBuff.addAndGet(delta);
- }
-
- public static long getFSBuffHits()
- {
- return _numHitsFSBuff.get();
- }
-
- public static void incrementFSHits()
- {
- _numHitsFS.incrementAndGet();
- }
-
- public static void incrementFSHits(int delta)
- {
- _numHitsFS.addAndGet(delta);
- }
-
- public static long getFSHits()
- {
- return _numHitsFS.get();
- }
-
- public static void incrementHDFSHits()
- {
- _numHitsHDFS.incrementAndGet();
- }
-
- public static void incrementHDFSHits(int delta)
- {
- _numHitsHDFS.addAndGet(delta);
- }
-
- public static long getHDFSHits()
- {
- return _numHitsHDFS.get();
- }
-
- public static void incrementFSBuffWrites()
- {
- _numWritesFSBuff.incrementAndGet();
- }
-
- public static void incrementFSBuffWrites(int delta)
- {
- _numWritesFSBuff.addAndGet(delta);
- }
-
- public static long getFSBuffWrites()
- {
- return _numWritesFSBuff.get();
- }
-
- public static void incrementFSWrites()
- {
- _numWritesFS.incrementAndGet();
- }
-
- public static void incrementFSWrites(int delta)
- {
- _numWritesFS.addAndGet(delta);
- }
-
- public static long getFSWrites()
- {
- return _numWritesFS.get();
- }
-
- public static void incrementHDFSWrites()
- {
- _numWritesHDFS.incrementAndGet();
- }
-
- public static void incrementHDFSWrites(int delta)
- {
- _numWritesHDFS.addAndGet(delta);
- }
-
- public static long getHDFSWrites()
- {
- return _numWritesHDFS.get();
- }
-
- public static void incrementAcquireRTime(long delta)
- {
- _ctimeAcquireR.addAndGet(delta);
- }
-
- public static long getAcquireRTime()
- {
- return _ctimeAcquireR.get();
- }
-
- public static void incrementAcquireMTime(long delta)
- {
- _ctimeAcquireM.addAndGet(delta);
- }
-
- public static long getAcquireMTime()
- {
- return _ctimeAcquireM.get();
- }
-
- public static void incrementReleaseTime(long delta)
- {
- _ctimeRelease.addAndGet(delta);
- }
-
- public static long getReleaseTime()
- {
- return _ctimeRelease.get();
- }
-
-
- public static void incrementExportTime(long delta)
- {
- _ctimeExport.addAndGet(delta);
- }
-
- public static long getExportTime()
- {
- return _ctimeExport.get();
- }
-
-
- public static String displayHits()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(_numHitsMem.get());
- sb.append("/");
- sb.append(_numHitsFSBuff.get());
- sb.append("/");
- sb.append(_numHitsFS.get());
- sb.append("/");
- sb.append(_numHitsHDFS.get());
-
-
- return sb.toString();
- }
-
- public static String displayWrites()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(_numWritesFSBuff.get());
- sb.append("/");
- sb.append(_numWritesFS.get());
- sb.append("/");
- sb.append(_numWritesHDFS.get());
-
-
- return sb.toString();
- }
-
- public static String displayTime()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(String.format("%.3f", ((double)_ctimeAcquireR.get())/1000000000)); //in sec
- sb.append("/");
- sb.append(String.format("%.3f", ((double)_ctimeAcquireM.get())/1000000000)); //in sec
- sb.append("/");
- sb.append(String.format("%.3f", ((double)_ctimeRelease.get())/1000000000)); //in sec
- sb.append("/");
- sb.append(String.format("%.3f", ((double)_ctimeExport.get())/1000000000)); //in sec
-
- ;
-
- return sb.toString();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatusException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatusException.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatusException.java
deleted file mode 100644
index 1cefff7..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheStatusException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-
-public class CacheStatusException extends CacheException
-{
-
-
- private static final long serialVersionUID = 1L;
-
- public CacheStatusException (String message)
- {
- super (message);
- }
-
- public CacheStatusException (Exception cause)
- {
- super (cause);
- }
-
- public CacheStatusException (String message, Exception cause)
- {
- super (message, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheableData.java
deleted file mode 100644
index 2785e64..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/CacheableData.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.LazyWriteBuffer.RPolicy;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-
-
-/**
- * Each object of this class is a cache envelope for some large piece of data
- * called "data blob". (I prefer "blob" to "block" to avoid ambiguity.) For
- * example, the body of a matrix can be the data blob. The term "data blob"
- * refers strictly to the cacheable portion of the data object, often excluding
- * metadata and auxiliary parameters, as defined in the subclasses.
- * Under the protection of the envelope, the data blob may be evicted to
- * the file system; then the subclass must set its reference to <code>null</code>
- * to allow Java garbage collection. If other parts of the system continue
- * keep references to the data blob, its eviction will not release any memory.
- * To make the eviction meaningful, the rest of the system
- * must dispose of all references prior to giving the permission for eviction.
- *
- */
-public abstract class CacheableData extends Data
-{
-
- private static final long serialVersionUID = -413810592207212835L;
-
- protected static final Log LOG = LogFactory.getLog(CacheableData.class.getName());
-
- public static final long CACHING_THRESHOLD = 4*1024; //obj not s.t. caching if below threshold [in bytes]
- public static final double CACHING_BUFFER_SIZE = 0.15;
- public static final RPolicy CACHING_BUFFER_POLICY = RPolicy.FIFO;
- public static final boolean CACHING_BUFFER_PAGECACHE = false;
- public static final boolean CACHING_WRITE_CACHE_ON_READ = false;
-
- public static final String CACHING_COUNTER_GROUP_NAME = "SystemML Caching Counters";
-
-
- //flag indicating if caching is turned on (eviction writes only happen if activeFlag is true)
- private static boolean _activeFlag = false;
-
- public static String cacheEvictionLocalFilePath = null; //set during init
- public static String cacheEvictionLocalFilePrefix = "cache";
- public static final String cacheEvictionLocalFileExtension = ".dat";
-
- /**
- * Defines all possible cache status types for a data blob.
- * An object of class {@link CacheableData} can be in one of the following
- * five status types:
- *
- * <code>EMPTY</code>: Either there is no data blob at all, or the data blob
- * resides in a specified import file and has never been downloaded yet.
- * <code>READ</code>: The data blob is in main memory; one or more threads are
- * referencing and reading it (shared "read-only" lock). This status uses a
- * counter. Eviction is NOT allowed.
- * <code>MODIFY</code>: The data blob is in main memory; exactly one thread is
- * referencing and modifying it (exclusive "write" lock). Eviction is NOT allowed.
- * <code>CACHED</code>: The data blob is in main memory, and nobody is using nor referencing it.
- * There is always an persistent recovery object for it
- **/
- protected enum CacheStatus {
- EMPTY,
- READ,
- MODIFY,
- CACHED,
- CACHED_NOWRITE,
- };
-
- private static IDSequence _seq = null;
-
- static
- {
- _seq = new IDSequence();
- }
-
- /**
- * The unique (JVM-wide) ID of a cacheable data object; to ensure unique IDs across JVMs, we
- * concatenate filenames with a unique prefix (map task ID).
- */
- private final int _uniqueID;
-
- /**
- * The cache status of the data blob (whether it can be or is evicted, etc.)
- */
- private CacheStatus _cacheStatus = null;
- private int _numReadThreads = 0;
-
- protected CacheableData (DataType dt, ValueType vt)
- {
- super (dt, vt);
-
- _uniqueID = (int)_seq.getNextID();
-
- _cacheStatus = CacheStatus.EMPTY;
- _numReadThreads = 0;
- }
-
- // --------- ABSTRACT LOW-LEVEL CACHE I/O OPERATIONS ----------
-
- /**
- * Checks if the data blob reference points to some in-memory object.
- * This method is called when releasing the (last) lock. Do not call
- * this method for a blob that has been evicted.
- *
- * @return <code>true</code> if the blob is in main memory and the
- * reference points to it;
- * <code>false</code> if the blob reference is <code>null</code>.
- */
- protected abstract boolean isBlobPresent();
-
- /**
- * Low-level cache I/O method that physically evicts the data blob from
- * main memory. Must be defined by a subclass, never called by users.
- * @param mb
- *
- * @throws CacheIOException if the eviction fails, the data blob
- * remains as it was at the start.
- */
- protected abstract void evictBlobFromMemory(MatrixBlock mb)
- throws CacheIOException;
-
- /**
- * Low-level cache I/O method that physically restores the data blob to
- * main memory. Must be defined by a subclass, never called by users.
- *
- * @throws CacheIOException if the restore fails, the data blob
- * remains as it was at the start.
- * @throws CacheAssignmentException if the restored blob cannot be assigned
- * to this envelope.
- */
- protected abstract void restoreBlobIntoMemory()
- throws CacheIOException;
-
- /**
- * Low-level cache I/O method that deletes the file containing the
- * evicted data blob, without reading it.
- * Must be defined by a subclass, never called by users.
- */
- protected abstract void freeEvictedBlob();
-
- /**
- *
- */
- protected abstract boolean isBelowCachingThreshold();
-
-
- // ------------- IMPLEMENTED CACHE LOGIC METHODS --------------
-
- protected int getUniqueCacheID()
- {
- return _uniqueID;
- }
-
- /**
- * This method "acquires the lock" to ensure that the data blob is in main memory
- * (not evicted) while it is being accessed. When called, the method will try to
- * restore the blob if it has been evicted. There are two kinds of locks it may
- * acquire: a shared "read" lock (if the argument is <code>false</code>) or the
- * exclusive "modify" lock (if the argument is <code>true</code>).
- * The method can fail in three ways:
- * (1) if there is lock status conflict;
- * (2) if there is not enough cache memory to restore the blob;
- * (3) if the restore method returns an error.
- * The method locks the data blob in memory (which disables eviction) and updates
- * its last-access timestamp. For the shared "read" lock, acquiring a new lock
- * increments the associated count. The "read" count has to be decremented once
- * the blob is no longer used, which may re-enable eviction. This method has to
- * be called only once per matrix operation and coupled with {@link #release()},
- * because it increments the lock count and the other method decrements this count.
- *
- * @param isModify : <code>true</code> for the exclusive "modify" lock,
- * <code>false</code> for a shared "read" lock.
- * @throws CacheException
- */
- protected void acquire (boolean isModify, boolean restore)
- throws CacheException
- {
- switch ( _cacheStatus )
- {
- case CACHED:
- if(restore)
- restoreBlobIntoMemory();
- case CACHED_NOWRITE:
- case EMPTY:
- if (isModify)
- setModify();
- else
- addOneRead();
- break;
- case READ:
- if (isModify)
- throw new CacheStatusException ("READ-MODIFY not allowed.");
- else
- addOneRead();
- break;
- case MODIFY:
- throw new CacheStatusException ("MODIFY-MODIFY not allowed.");
- }
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Acquired lock on " + this.getDebugName() + ", status: " + this.getStatusAsString() );
- }
-
-
- /**
- * Call this method to permit eviction for the stored data blob, or to
- * decrement its "read" count if it is "read"-locked by other threads.
- * It is expected that you eliminate all external references to the blob
- * prior to calling this method, because otherwise eviction will
- * duplicate the blob, but not release memory. This method has to be
- * called only once per process and coupled with {@link #acquire(boolean)},
- * because it decrements the lock count and the other method increments
- * the lock count.
- *
- * @throws CacheException
- */
- protected void release(boolean cacheNoWrite)
- throws CacheException
- {
- switch ( _cacheStatus )
- {
- case EMPTY:
- case CACHED:
- case CACHED_NOWRITE:
- throw new CacheStatusException("Redundant release.");
- case READ:
- removeOneRead( isBlobPresent(), cacheNoWrite );
- break;
- case MODIFY:
- if ( isBlobPresent() )
- setCached();
- else
- setEmpty();
- break;
- }
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Released lock on " + this.getDebugName() + ", status: " + this.getStatusAsString());
-
- }
-
-
- // **************************************************
- // *** ***
- // *** CACHE STATUS FIELD - CLASSES AND METHODS ***
- // *** ***
- // **************************************************
-
-
- public String getStatusAsString()
- {
- return _cacheStatus.toString();
- }
-
- //TODO isCached is only public for access from SparkExectionContext, once we can assume
- //the existence of spark libraries, we can move the related code to MatrixObject and
- //make this method protected again
- public boolean isCached(boolean inclCachedNoWrite)
- {
- if( inclCachedNoWrite )
- return (_cacheStatus == CacheStatus.CACHED || _cacheStatus == CacheStatus.CACHED_NOWRITE);
- else
- return (_cacheStatus == CacheStatus.CACHED);
- }
-
- protected boolean isEmpty(boolean inclCachedNoWrite)
- {
- if( inclCachedNoWrite )
- return (_cacheStatus == CacheStatus.EMPTY || _cacheStatus == CacheStatus.CACHED_NOWRITE);
- else
- return (_cacheStatus == CacheStatus.EMPTY);
- }
-
- protected boolean isModify()
- {
- return (_cacheStatus == CacheStatus.MODIFY);
- }
-
- protected void setEmpty()
- {
- _cacheStatus = CacheStatus.EMPTY;
- }
-
- protected void setModify()
- {
- _cacheStatus = CacheStatus.MODIFY;
- }
-
- protected void setCached()
- {
- _cacheStatus = CacheStatus.CACHED;
- }
-
- protected void addOneRead()
- {
- _numReadThreads ++;
- _cacheStatus = CacheStatus.READ;
- }
-
- protected void removeOneRead(boolean doesBlobExist, boolean cacheNoWrite)
- {
- _numReadThreads --;
- if (_numReadThreads == 0) {
- if( cacheNoWrite )
- _cacheStatus = (doesBlobExist ?
- CacheStatus.CACHED_NOWRITE : CacheStatus.EMPTY);
- else
- _cacheStatus = (doesBlobExist ?
- CacheStatus.CACHED : CacheStatus.EMPTY);
- }
- }
-
- protected boolean isAvailableToRead()
- {
- return ( _cacheStatus == CacheStatus.EMPTY
- || _cacheStatus == CacheStatus.CACHED
- || _cacheStatus == CacheStatus.CACHED_NOWRITE
- || _cacheStatus == CacheStatus.READ);
- }
-
- protected boolean isAvailableToModify()
- {
- return ( _cacheStatus == CacheStatus.EMPTY
- || _cacheStatus == CacheStatus.CACHED
- || _cacheStatus == CacheStatus.CACHED_NOWRITE);
- }
-
- // --------- STATIC CACHE INIT/CLEANUP OPERATIONS ----------
-
-
- /**
- *
- */
- public synchronized static void cleanupCacheDir()
- {
- //cleanup remaining cached writes
- LazyWriteBuffer.cleanup();
-
- //delete cache dir and files
- cleanupCacheDir(true);
- }
-
- /**
- * Deletes the DML-script-specific caching working dir.
- *
- * @param withDir
- */
- public synchronized static void cleanupCacheDir(boolean withDir)
- {
- //get directory name
- String dir = cacheEvictionLocalFilePath;
-
- //clean files with cache prefix
- if( dir != null ) //if previous init cache
- {
- File fdir = new File(dir);
- if( fdir.exists()){ //just for robustness
- File[] files = fdir.listFiles();
- for( File f : files )
- if( f.getName().startsWith(cacheEvictionLocalFilePrefix) )
- f.delete();
- if( withDir )
- fdir.delete(); //deletes dir only if empty
- }
- }
-
- _activeFlag = false;
- }
-
- /**
- * Inits caching with the default uuid of DMLScript
- * @throws IOException
- */
- public synchronized static void initCaching()
- throws IOException
- {
- initCaching(DMLScript.getUUID());
- }
-
- /**
- * Creates the DML-script-specific caching working dir.
- *
- * Takes the UUID in order to allow for custom uuid, e.g., for remote parfor caching
- *
- * @throws IOException
- */
- public synchronized static void initCaching( String uuid )
- throws IOException
- {
- try
- {
- String dir = LocalFileUtils.getWorkingDir( LocalFileUtils.CATEGORY_CACHE );
- LocalFileUtils.createLocalFileIfNotExist(dir);
- cacheEvictionLocalFilePath = dir;
- }
- catch(DMLRuntimeException e)
- {
- throw new IOException(e);
- }
-
- //init write-ahead buffer
- LazyWriteBuffer.init();
-
- _activeFlag = true; //turn on caching
- }
-
- public static synchronized boolean isCachingActive()
- {
- return _activeFlag;
- }
-
- public static synchronized void disableCaching()
- {
- _activeFlag = false;
- }
-
- public static synchronized void enableCaching()
- {
- _activeFlag = true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/LazyWriteBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/LazyWriteBuffer.java
deleted file mode 100644
index 3d60788..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map.Entry;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-
-/**
- *
- *
- */
-public class LazyWriteBuffer
-{
-
- public enum RPolicy{
- FIFO,
- LRU
- }
-
- //global size limit in bytes
- private static long _limit;
-
- //current size in bytes
- private static long _size;
-
- //eviction queue of <filename,buffer> pairs (implemented via linked hash map
- //for (1) queue semantics and (2) constant time get/insert/delete operations)
- private static EvictionQueue _mQueue;
-
- static
- {
- //obtain the logical buffer size in bytes
- long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
- _limit = (long)(CacheableData.CACHING_BUFFER_SIZE * maxMem);
- }
-
- /**
- *
- * @param fname
- * @param mb
- * @throws IOException
- */
- public static void writeMatrix( String fname, MatrixBlock mb )
- throws IOException
- {
- long lSize = mb.getExactSizeOnDisk();
- boolean requiresWrite = ( lSize > _limit //global buffer limit
- || !ByteBuffer.isValidCapacity(lSize, mb) ); //local buffer limit
-
- if( !requiresWrite ) //if it fits in writebuffer
- {
- ByteBuffer bbuff = null;
-
- //modify buffer pool
- synchronized( _mQueue )
- {
- //evict matrices to make room (by default FIFO)
- while( _size+lSize >= _limit )
- {
- //remove first entry from eviction queue
- Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
- String ftmp = entry.getKey();
- ByteBuffer tmp = entry.getValue();
-
- if( tmp != null )
- {
- //wait for pending serialization
- tmp.checkSerialized();
-
- //evict matrix
- tmp.evictBuffer(ftmp);
- tmp.freeMemory();
- _size-=tmp.getSize();
-
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementFSWrites();
- }
- }
-
- //create buffer (reserve mem), and lock
- bbuff = new ByteBuffer( lSize );
-
- //put placeholder into buffer pool
- _mQueue.addLast(fname, bbuff);
- _size += lSize;
- }
-
- //serialize matrix (outside synchronized critical path)
- bbuff.serializeMatrix(mb);
-
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementFSBuffWrites();
- }
- else
- {
- //write directly to local FS (bypass buffer if too large)
- LocalFileUtils.writeMatrixBlockToLocal(fname, mb);
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementFSWrites();
- }
- }
-
- /**
- *
- * @param fname
- */
- public static void deleteMatrix( String fname )
- {
- boolean requiresDelete = true;
-
- synchronized( _mQueue )
- {
- //remove queue entry
- ByteBuffer ldata = _mQueue.remove(fname);
- if( ldata != null )
- {
- _size -= ldata.getSize();
- requiresDelete = false;
- ldata.freeMemory(); //cleanup
- }
- }
-
- //delete from FS if required
- if( requiresDelete )
- LocalFileUtils.deleteFileIfExists(fname, true);
- }
-
- /**
- *
- * @param fname
- * @return
- * @throws IOException
- */
- public static MatrixBlock readMatrix( String fname )
- throws IOException
- {
- MatrixBlock mb = null;
- ByteBuffer ldata = null;
-
- //probe write buffer
- synchronized( _mQueue )
- {
- ldata = _mQueue.get(fname);
-
- //modify eviction order (accordingly to access)
- if( CacheableData.CACHING_BUFFER_POLICY == RPolicy.LRU
- && ldata != null )
- {
- //reinsert entry at end of eviction queue
- _mQueue.remove( fname );
- _mQueue.addLast( fname, ldata );
- }
- }
-
- //deserialize or read from FS if required
- if( ldata != null )
- {
- mb = ldata.deserializeMatrix();
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementFSBuffHits();
- }
- else
- {
- mb = LocalFileUtils.readMatrixBlockFromLocal(fname); //read from FS
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementFSHits();
- }
-
- return mb;
- }
-
- /**
- *
- */
- public static void init()
- {
- _mQueue = new EvictionQueue();
- _size = 0;
- if( CacheableData.CACHING_BUFFER_PAGECACHE )
- PageCache.init();
- }
-
- /**
- *
- */
- public static void cleanup()
- {
- if( _mQueue!=null )
- _mQueue.clear();
- if( CacheableData.CACHING_BUFFER_PAGECACHE )
- PageCache.clear();
- }
-
- /**
- *
- * @return
- */
- public static long getWriteBufferSize()
- {
- long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
- return (long)(CacheableData.CACHING_BUFFER_SIZE * maxMem);
- }
-
- /**
- *
- */
- public static void printStatus( String position )
- {
- System.out.println("WRITE BUFFER STATUS ("+position+") --");
-
- //print buffer meta data
- System.out.println("\tWB: Buffer Meta Data: " +
- "limit="+_limit+", " +
- "size[bytes]="+_size+", " +
- "size[elements]="+_mQueue.size()+"/"+_mQueue.size());
-
- //print current buffer entries
- int count = _mQueue.size();
- for( Entry<String, ByteBuffer> entry : _mQueue.entrySet() )
- {
- String fname = entry.getKey();
- ByteBuffer bbuff = entry.getValue();
-
- System.out.println("\tWB: buffer element ("+count+"): "+fname+", "+bbuff.getSize()+", "+bbuff.isInSparseFormat());
- count--;
- }
- }
-
- /**
- * Extended LinkedHashMap with convenience methods for adding and removing
- * last/first entries.
- *
- */
- private static class EvictionQueue extends LinkedHashMap<String, ByteBuffer>
- {
- private static final long serialVersionUID = -5208333402581364859L;
-
- public void addLast( String fname, ByteBuffer bbuff )
- {
- //put entry into eviction queue w/ 'addLast' semantics
- put(fname, bbuff);
- }
-
- public Entry<String, ByteBuffer> removeFirst()
- {
- //move iterator to first entry
- Iterator<Entry<String, ByteBuffer>> iter = entrySet().iterator();
- Entry<String, ByteBuffer> entry = iter.next();
-
- //remove current iterator entry
- iter.remove();
-
- return entry;
- }
- }
-}