You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/07/28 02:46:45 UTC

systemml git commit: [SYSTEMML-2420] Performance paramserv data serialize/deserialize

Repository: systemml
Updated Branches:
  refs/heads/master dfa27ba22 -> 4225fd8b9


[SYSTEMML-2420] Performance paramserv data serialize/deserialize

This patch makes a minior performance improvement to the serialization
and deserialization code path of distributed paramserv. Specifically, we
now use cache data outputs and byte buffer data inputs which avoids
unnecessary byte array copies and thus also reduce unnecessary garbage
collection overheads. Furthermore, this also includes an existing of all
of our custom data inputs/outputs to properly support UTF in all
scenarios.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4225fd8b
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4225fd8b
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4225fd8b

Branch: refs/heads/master
Commit: 4225fd8b96deb215072385a2e00f7694ac86e036
Parents: dfa27ba
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Jul 27 19:47:37 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Jul 27 19:47:37 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheDataInput.java  |  28 ++--
 .../controlprogram/caching/CacheDataOutput.java |  14 +-
 .../paramserv/spark/SparkPSWorker.java          |  14 +-
 .../paramserv/spark/rpc/PSRpcCall.java          |  19 +--
 .../paramserv/spark/rpc/PSRpcObject.java        |   8 +-
 .../paramserv/spark/rpc/PSRpcResponse.java      |  21 +--
 .../sysml/runtime/util/ByteBufferDataInput.java | 156 +++++++++++++++++++
 .../util/FastBufferedDataInputStream.java       |   9 +-
 .../util/FastBufferedDataOutputStream.java      |   2 +-
 9 files changed, 213 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
index dde2211..7fa8322 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.controlprogram.caching;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.sysml.runtime.io.IOUtilFunctions;
@@ -28,13 +29,11 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock;
 
 public class CacheDataInput implements DataInput, MatrixBlockDataInput
 {
-	protected byte[] _buff;
-	protected int _bufflen;
+	protected final byte[] _buff;
 	protected int _count;
 
-	public CacheDataInput( byte[] mem ) {
+	public CacheDataInput(byte[] mem) {
 		_buff = mem;
-		_bufflen = _buff.length;
 		_count = 0;
 	}
 
@@ -72,12 +71,16 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput
 
 	@Override
 	public short readShort() throws IOException {
-		throw new IOException("Not supported.");
+		int ret = baToShort(_buff, _count);
+		_count += 2;
+		return (short) ret;
 	}
 
 	@Override
 	public int readUnsignedShort() throws IOException {
-		throw new IOException("Not supported.");
+		int ret = baToShort(_buff, _count);
+		_count += 2;
+		return ret;
 	}
 
 	@Override
@@ -107,13 +110,10 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput
 	}
 
 	@Override
-	public double readDouble() 
-		throws IOException 
-	{
+	public double readDouble() throws IOException {
 		long tmp = baToLong(_buff, _count);
 		double tmp2 = Double.longBitsToDouble(tmp);
 		_count += 8;
-		
 		return tmp2;
 	}
 
@@ -124,12 +124,12 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput
 
 	@Override
 	public String readUTF() throws IOException {
-		throw new IOException("Not supported.");
+		return DataInputStream.readUTF(this);
 	}
 	
-    ///////////////////////////////////////////////
-    // Implementation of MatrixBlockDSMDataOutput
-    ///////////////////////////////////////////////	
+	///////////////////////////////////////////////
+	// Implementation of MatrixBlockDSMDataOutput
+	///////////////////////////////////////////////
 	
 	@Override
 	public long readDoubleArray(int len, double[] varr) 

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
index 1b740d2..6e16764 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
@@ -33,13 +33,15 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock;
  */
 public class CacheDataOutput implements DataOutput, MatrixBlockDataOutput 
 {
-	protected byte[] _buff;
-	protected int _bufflen;
+	protected final byte[] _buff;
 	protected int _count;
 
-	public CacheDataOutput( byte[] mem ) {
+	public CacheDataOutput(int size) {
+		this(new byte[size]);
+	}
+	
+	public CacheDataOutput(byte[] mem) {
 		_buff = mem;
-		_bufflen = _buff.length;
 		_count = 0;
 	}
 	
@@ -197,6 +199,10 @@ public class CacheDataOutput implements DataOutput, MatrixBlockDataOutput
 		for( int i=lrlen; i<rlen; i++ )
 			writeInt( 0 );
 	}
+	
+	public byte[] getBytes() {
+		return _buff;
+	}
 
 	private static void shortToBa( final int val, byte[] ba, final int off ) {
 		IOUtilFunctions.shortToBa(val, ba, off);

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
index 5732a4d..cb3e729 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
@@ -135,17 +135,20 @@ public class SparkPSWorker extends LocalPSWorker implements VoidFunction<Tuple2<
 	
 	@Override
 	protected void accLocalModelUpdateTime(Timing time) {
-		_aUpdate.add((long) time.stop());
+		if( time != null )
+			_aUpdate.add((long) time.stop());
 	}
 
 	@Override
 	protected void accBatchIndexingTime(Timing time) {
-		_aIndex.add((long) time.stop());
+		if( time != null )
+			_aIndex.add((long) time.stop());
 	}
 
 	@Override
 	protected void accGradientComputeTime(Timing time) {
-		_aGrad.add((long) time.stop());
+		if( time != null )
+			_aGrad.add((long) time.stop());
 	}
 	
 	@Override
@@ -158,7 +161,8 @@ public class SparkPSWorker extends LocalPSWorker implements VoidFunction<Tuple2<
 		_nBatches.add(n);
 	}
 	
-	private void accSetupTime(Timing tSetup) {
-		_aSetup.add((long) tSetup.stop());
+	private void accSetupTime(Timing time) {
+		if( time != null )
+			_aSetup.add((long) time.stop());
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcCall.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcCall.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcCall.java
index b8f482c..a33fda2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcCall.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcCall.java
@@ -19,16 +19,13 @@
 
 package org.apache.sysml.runtime.controlprogram.paramserv.spark.rpc;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheDataOutput;
 import org.apache.sysml.runtime.instructions.cp.ListObject;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
+import org.apache.sysml.runtime.util.ByteBufferDataInput;
 
 public class PSRpcCall extends PSRpcObject {
 
@@ -59,26 +56,22 @@ public class PSRpcCall extends PSRpcObject {
 	}
 	
 	public void deserialize(ByteBuffer buffer) throws IOException {
-		DataInputStream dis = new DataInputStream(
-			new ByteArrayInputStream(IOUtilFunctions.getBytes(buffer)));
+		ByteBufferDataInput dis = new ByteBufferDataInput(buffer);
 		_method = dis.readInt();
 		validateMethod(_method);
 		_workerID = dis.readInt();
 		if (dis.available() > 1)
 			_data = readAndDeserialize(dis);
-		dis.close();
 	}
 
 	public ByteBuffer serialize() throws IOException {
-		//TODO: Perf: use CacheDataOutput to avoid multiple copies (needs UTF handling)
-		ByteArrayOutputStream bos = new ByteArrayOutputStream(getApproxSerializedSize(_data));
-		FastBufferedDataOutputStream dos = new FastBufferedDataOutputStream(bos);
+		int len = 8 + getExactSerializedSize(_data);
+		CacheDataOutput dos = new CacheDataOutput(len);
 		dos.writeInt(_method);
 		dos.writeInt(_workerID);
 		if (_data != null)
 			serializeAndWriteListObject(_data, dos);
-		dos.flush();
-		return ByteBuffer.wrap(bos.toByteArray());
+		return ByteBuffer.wrap(dos.getBytes());
 	}
 	
 	private void validateMethod(int method) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcObject.java
index 7d3353f..411822f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcObject.java
@@ -31,6 +31,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.cp.ListObject;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public abstract class PSRpcObject {
@@ -80,12 +81,11 @@ public abstract class PSRpcObject {
 	 * @param lo list object
 	 * @return serialization size
 	 */
-	protected int getApproxSerializedSize(ListObject lo) {
+	protected int getExactSerializedSize(ListObject lo) {
 		if( lo == null ) return 0;
 		long result = 4 + 1; // list length and of named
-		result += lo.getLength() * (Integer.BYTES); // bytes for the size of names
-		if (lo.isNamedList())
-			result += lo.getNames().stream().mapToLong(s -> s.length()).sum();
+		if (lo.isNamedList()) //size for names incl length
+			result += lo.getNames().stream().mapToLong(s -> IOUtilFunctions.getUTFSize(s)).sum();
 		result += lo.getData().stream().mapToLong(d ->
 			((MatrixObject)d).acquireReadAndRelease().getExactSizeOnDisk()).sum();
 		if( result > Integer.MAX_VALUE )

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcResponse.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcResponse.java
index 3517491..010481e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcResponse.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcResponse.java
@@ -19,15 +19,13 @@
 
 package org.apache.sysml.runtime.controlprogram.paramserv.spark.rpc;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.sysml.runtime.util.ByteBufferDataInput;
+import org.apache.sysml.runtime.controlprogram.caching.CacheDataOutput;
 import org.apache.sysml.runtime.instructions.cp.ListObject;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
 
 public class PSRpcResponse extends PSRpcObject {
 	public enum Type  {
@@ -68,8 +66,7 @@ public class PSRpcResponse extends PSRpcObject {
 
 	@Override
 	public void deserialize(ByteBuffer buffer) throws IOException {
-		DataInputStream dis = new DataInputStream(
-			new ByteArrayInputStream(IOUtilFunctions.getBytes(buffer)));
+		ByteBufferDataInput dis = new ByteBufferDataInput(buffer);
 		_status = Type.values()[dis.readInt()];
 		switch (_status) {
 			case SUCCESS:
@@ -81,16 +78,13 @@ public class PSRpcResponse extends PSRpcObject {
 				_data = dis.readUTF();
 				break;
 		}
-		dis.close();
 	}
 
 	@Override
 	public ByteBuffer serialize() throws IOException {
-		//TODO: Perf: use CacheDataOutput to avoid multiple copies (needs UTF handling)
-		int len = 4 + (_status==Type.SUCCESS ? getApproxSerializedSize((ListObject)_data) :
-			_status==Type.SUCCESS_EMPTY ? 0 : ((String)_data).length());
-		ByteArrayOutputStream bos = new ByteArrayOutputStream(len);
-		FastBufferedDataOutputStream dos = new FastBufferedDataOutputStream(bos);
+		int len = 4 + (_status==Type.SUCCESS ? getExactSerializedSize((ListObject)_data) :
+			_status==Type.SUCCESS_EMPTY ? 0 : IOUtilFunctions.getUTFSize((String)_data));
+		CacheDataOutput dos = new CacheDataOutput(len);
 		dos.writeInt(_status.ordinal());
 		switch (_status) {
 			case SUCCESS:
@@ -102,7 +96,6 @@ public class PSRpcResponse extends PSRpcObject {
 				dos.writeUTF(_data.toString());
 				break;
 		}
-		dos.flush();
-		return ByteBuffer.wrap(bos.toByteArray());
+		return ByteBuffer.wrap(dos.getBytes());
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/ByteBufferDataInput.java b/src/main/java/org/apache/sysml/runtime/util/ByteBufferDataInput.java
new file mode 100644
index 0000000..a1aab1c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/ByteBufferDataInput.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.sysml.runtime.matrix.data.MatrixBlockDataInput;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+
+public class ByteBufferDataInput implements DataInput, MatrixBlockDataInput
+{
+	protected final ByteBuffer _buff;
+
+	public ByteBufferDataInput(ByteBuffer buff) {
+		_buff = buff;
+	}
+
+	public int available() {
+		return _buff.limit() - _buff.position();
+	}
+	
+	@Override
+	public void readFully(byte[] b) throws IOException {
+		_buff.get(b);
+	}
+
+	@Override
+	public void readFully(byte[] b, int off, int len) throws IOException {
+		_buff.get(b, off, len);
+	}
+
+	@Override
+	public int skipBytes(int n) throws IOException {
+		_buff.position(_buff.position()+n);
+		return n;
+	}
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		//mask to adhere to the input stream semantic
+		return ( (_buff.get() & 0xFF) != 0 );
+	}
+
+	@Override
+	public byte readByte() throws IOException {
+		//mask to adhere to the input stream semantic
+		return (byte) (_buff.get() & 0xFF);
+	}
+
+	@Override
+	public int readUnsignedByte() throws IOException {
+		throw new IOException("Not supported.");
+	}
+
+	@Override
+	public short readShort() throws IOException {
+		return _buff.getShort();
+	}
+
+	@Override
+	public int readUnsignedShort() throws IOException {
+		return _buff.getChar();
+	}
+
+	@Override
+	public char readChar() throws IOException {
+		return _buff.getChar();
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		return _buff.getInt();
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		return _buff.getLong();
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return _buff.getFloat();
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return _buff.getDouble();
+	}
+
+	@Override
+	public String readLine() throws IOException {
+		throw new IOException("Not supported.");
+	}
+
+	@Override
+	public String readUTF() throws IOException {
+		return DataInputStream.readUTF(this);
+	}
+	
+	///////////////////////////////////////////////
+	// Implementation of MatrixBlockDSMDataOutput
+	///////////////////////////////////////////////
+	
+	@Override
+	public long readDoubleArray(int len, double[] varr) throws IOException  {
+		long nnz = 0;
+		for( int i=0; i<len; i++ )
+			nnz += (varr[i] = _buff.getDouble()) != 0 ? 1 : 0;
+		return nnz;
+	}
+
+	@Override
+	public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
+		throws IOException 
+	{
+		//counter for non-zero elements
+		long gnnz = 0;
+		
+		//read all individual sparse rows from input
+		for( int i=0; i<rlen; i++ ) {
+			int lnnz = _buff.getInt();
+			if( lnnz > 0 ) { //non-zero row
+				rows.allocate(i, lnnz); //preallocate row
+				for( int j=0; j<lnnz; j++ ) //read single sparse row
+					rows.append(i, _buff.getInt(), _buff.getDouble());
+				gnnz += lnnz;
+			}
+		}
+		
+		//sanity check valid number of read nnz
+		if( gnnz != nnz )
+			throw new IOException("Invalid number of read nnz: "+gnnz+" vs "+nnz);
+		
+		return nnz;
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
index 9069859..dc56f13 100644
--- a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
+++ b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.util;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -91,12 +92,14 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da
 
 	@Override
 	public short readShort() throws IOException {
-		throw new IOException("Not supported.");
+		readFully(_buff, 0, 2);
+		return (short) baToShort(_buff, 0);
 	}
 
 	@Override
 	public int readUnsignedShort() throws IOException {
-		throw new IOException("Not supported.");
+		readFully(_buff, 0, 2);
+		return baToShort(_buff, 0);
 	}
 
 	@Override
@@ -136,7 +139,7 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da
 
 	@Override
 	public String readUTF() throws IOException {
-		throw new IOException("Not supported.");
+		return DataInputStream.readUTF(this);
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/4225fd8b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
index dd70d8f..9d695a2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
+++ b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
@@ -186,7 +186,7 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements
 		//write utf payload
 		for( int i=0; i<slen; i++ ) {
 			if (_count+3 > _bufflen)
-			    flushBuffer();
+				flushBuffer();
 			char c = s.charAt(i);
 			if( c>= 0x0001 && c<=0x007F ) //1 byte range
 				_buff[_count++] = (byte) c;