You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:54 UTC
[17/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
deleted file mode 100644
index 064841f..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-
-import java.io.IOException;
-
-public abstract class FileAppender implements Appender {
- protected boolean inited = false;
-
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final Path path;
-
- protected boolean enabledStats;
-
- public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.path = path;
- }
-
- public void init() throws IOException {
- if (inited) {
- throw new IllegalStateException("FileAppender is already initialized.");
- }
- inited = true;
- }
-
- public void enableStats() {
- if (inited) {
- throw new IllegalStateException("Should enable this option before init()");
- }
-
- this.enabledStats = true;
- }
-
- public abstract long getOffset() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
deleted file mode 100644
index c831822..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public abstract class FileScanner implements Scanner {
- private static final Log LOG = LogFactory.getLog(FileScanner.class);
-
- protected boolean inited = false;
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final FileFragment fragment;
- protected final int columnNum;
-
- protected Column [] targets;
-
- public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.fragment = fragment;
- this.columnNum = this.schema.getColumnNum();
- }
-
- public void init() throws IOException {
- inited = true;
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- this.targets = targets;
- }
-
- public void setSearchCondition(Object expr) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- }
-
- public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
- String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
- FileSystem fs;
- if(tajoUser != null) {
- try {
- fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
- } catch (InterruptedException e) {
- LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
- fs = FileSystem.get(path.toUri(), tajoConf);
- }
- } else {
- fs = FileSystem.get(path.toUri(), tajoConf);
- }
-
- return fs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
deleted file mode 100644
index f05a316..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.exception.UnsupportedException;
-
-import java.net.InetAddress;
-
-/**
- * An instance of FrameTuple is an immutable tuple.
- * It contains two tuples and pretends to be one instance of Tuple for
- * join qual evaluatations.
- */
-public class FrameTuple implements Tuple, Cloneable {
- private int size;
- private int leftSize;
-
- private Tuple left;
- private Tuple right;
-
- public FrameTuple() {}
-
- public FrameTuple(Tuple left, Tuple right) {
- set(left, right);
- }
-
- public void set(Tuple left, Tuple right) {
- this.size = left.size() + right.size();
- this.left = left;
- this.leftSize = left.size();
- this.right = right;
- }
-
- @Override
- public int size() {
- return size;
- }
-
- @Override
- public boolean contains(int fieldId) {
- Preconditions.checkArgument(fieldId < size,
- "Out of field access: " + fieldId);
-
- if (fieldId < leftSize) {
- return left.contains(fieldId);
- } else {
- return right.contains(fieldId - leftSize);
- }
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return get(fieldid) instanceof NullDatum;
- }
-
- @Override
- public void clear() {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException();
- }
-
- @Override
- public void setOffset(long offset) {
- throw new UnsupportedException();
- }
-
- @Override
- public long getOffset() {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(Datum [] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public Datum get(int fieldId) {
- Preconditions.checkArgument(fieldId < size,
- "Out of field access: " + fieldId);
-
- if (fieldId < leftSize) {
- return left.get(fieldId);
- } else {
- return right.get(fieldId - leftSize);
- }
- }
-
- @Override
- public BooleanDatum getBoolean(int fieldId) {
- return (BooleanDatum) get(fieldId);
- }
-
- @Override
- public BitDatum getByte(int fieldId) {
- return (BitDatum) get(fieldId);
- }
-
- @Override
- public CharDatum getChar(int fieldId) {
- return (CharDatum) get(fieldId);
- }
-
- @Override
- public BlobDatum getBytes(int fieldId) {
- return (BlobDatum) get(fieldId);
- }
-
- @Override
- public Int2Datum getShort(int fieldId) {
- return (Int2Datum) get(fieldId);
- }
-
- @Override
- public Int4Datum getInt(int fieldId) {
- return (Int4Datum) get(fieldId);
- }
-
- @Override
- public Int8Datum getLong(int fieldId) {
- return (Int8Datum) get(fieldId);
- }
-
- @Override
- public Float4Datum getFloat(int fieldId) {
- return (Float4Datum) get(fieldId);
- }
-
- @Override
- public Float8Datum getDouble(int fieldId) {
- return (Float8Datum) get(fieldId);
- }
-
- @Override
- public Inet4Datum getIPv4(int fieldId) {
- return (Inet4Datum) get(fieldId);
- }
-
- @Override
- public byte[] getIPv4Bytes(int fieldId) {
- return get(fieldId).asByteArray();
- }
-
- @Override
- public InetAddress getIPv6(int fieldId) {
- throw new UnimplementedException();
- }
-
- @Override
- public byte[] getIPv6Bytes(int fieldId) {
- throw new UnimplementedException();
- }
-
- @Override
- public TextDatum getString(int fieldId) {
- return (TextDatum) get(fieldId);
- }
-
- @Override
- public TextDatum getText(int fieldId) {
- return (TextDatum) get(fieldId);
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- FrameTuple frameTuple = (FrameTuple) super.clone();
- frameTuple.set(this.left.clone(), this.right.clone());
- return frameTuple;
- }
-
- @Override
- public Datum[] getValues(){
- throw new UnsupportedException();
- }
-
- public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- for(int i=0; i < size(); i++) {
- if(contains(i)) {
- if(first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(get(i));
- }
- }
- str.append(")");
- return str.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
deleted file mode 100644
index 4d484df..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.exception.InvalidCastException;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-
-public class LazyTuple implements Tuple, Cloneable {
- private long offset;
- private Datum[] values;
- private byte[][] textBytes;
- private Schema schema;
- private byte[] nullBytes;
- private SerializerDeserializer serializeDeserialize;
-
- public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
- this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
- }
-
- public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
- this.schema = schema;
- this.textBytes = textBytes;
- this.values = new Datum[schema.getColumnNum()];
- this.offset = offset;
- this.nullBytes = nullBytes;
- this.serializeDeserialize = serde;
- }
-
- public LazyTuple(LazyTuple tuple) {
- this.values = tuple.getValues();
- this.offset = tuple.offset;
- this.schema = tuple.schema;
- this.textBytes = new byte[size()][];
- this.nullBytes = tuple.nullBytes;
- this.serializeDeserialize = tuple.serializeDeserialize;
- }
-
- @Override
- public int size() {
- return values.length;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return textBytes[fieldid] != null || values[fieldid] != null;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return get(fieldid) instanceof NullDatum;
- }
-
- @Override
- public void clear() {
- for (int i = 0; i < values.length; i++) {
- values[i] = null;
- textBytes[i] = null;
- }
- }
-
- //////////////////////////////////////////////////////
- // Setter
- //////////////////////////////////////////////////////
- @Override
- public void put(int fieldId, Datum value) {
- values[fieldId] = value;
- textBytes[fieldId] = null;
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- for (int i = fieldId, j = 0; j < values.length; i++, j++) {
- this.values[i] = values[j];
- }
- this.textBytes = new byte[values.length][];
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
- values[i] = tuple.get(j);
- textBytes[i] = null;
- }
- }
-
- @Override
- public void put(Datum[] values) {
- System.arraycopy(values, 0, this.values, 0, size());
- this.textBytes = new byte[values.length][];
- }
-
- //////////////////////////////////////////////////////
- // Getter
- //////////////////////////////////////////////////////
- @Override
- public Datum get(int fieldId) {
- if (values[fieldId] != null)
- return values[fieldId];
- else if (textBytes.length <= fieldId) {
- values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
- } else if (textBytes[fieldId] != null) {
- try {
- values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
- textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
- } catch (Exception e) {
- values[fieldId] = NullDatum.get();
- }
- textBytes[fieldId] = null;
- } else {
- //non-projection
- }
- return values[fieldId];
- }
-
- @Override
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- @Override
- public long getOffset() {
- return this.offset;
- }
-
- @Override
- public BooleanDatum getBoolean(int fieldId) {
- return (BooleanDatum) get(fieldId);
- }
-
- @Override
- public BitDatum getByte(int fieldId) {
- return (BitDatum) get(fieldId);
- }
-
- @Override
- public CharDatum getChar(int fieldId) {
- return (CharDatum) get(fieldId);
- }
-
- @Override
- public BlobDatum getBytes(int fieldId) {
- return (BlobDatum) get(fieldId);
- }
-
- @Override
- public Int2Datum getShort(int fieldId) {
- return (Int2Datum) get(fieldId);
- }
-
- @Override
- public Int4Datum getInt(int fieldId) {
- return (Int4Datum) get(fieldId);
- }
-
- @Override
- public Int8Datum getLong(int fieldId) {
- return (Int8Datum) get(fieldId);
- }
-
- @Override
- public Float4Datum getFloat(int fieldId) {
- return (Float4Datum) get(fieldId);
- }
-
- @Override
- public Float8Datum getDouble(int fieldId) {
- return (Float8Datum) get(fieldId);
- }
-
- @Override
- public Inet4Datum getIPv4(int fieldId) {
- return (Inet4Datum) get(fieldId);
- }
-
- @Override
- public byte[] getIPv4Bytes(int fieldId) {
- return get(fieldId).asByteArray();
- }
-
- @Override
- public InetAddress getIPv6(int fieldId) {
- throw new InvalidCastException("IPv6 is unsupported yet");
- }
-
- @Override
- public byte[] getIPv6Bytes(int fieldId) {
- throw new InvalidCastException("IPv6 is unsupported yet");
- }
-
- @Override
- public TextDatum getString(int fieldId) {
- return (TextDatum) get(fieldId);
- }
-
- @Override
- public TextDatum getText(int fieldId) {
- return (TextDatum) get(fieldId);
- }
-
- public byte[] getTextBytes(int fieldId) {
- if(textBytes[fieldId] != null)
- return textBytes[fieldId];
- else {
- return get(fieldId).asTextBytes();
- }
- }
-
- public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- Datum d;
- for (int i = 0; i < values.length; i++) {
- d = get(i);
- if (d != null) {
- if (first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(d);
- }
- }
- str.append(")");
- return str.toString();
- }
-
- @Override
- public int hashCode() {
- int hashCode = 37;
- for (int i = 0; i < values.length; i++) {
- Datum d = get(i);
- if (d != null) {
- hashCode ^= (d.hashCode() * 41);
- } else {
- hashCode = hashCode ^ (i + 17);
- }
- }
-
- return hashCode;
- }
-
- @Override
- public Datum[] getValues() {
- Datum[] datums = new Datum[values.length];
- for (int i = 0; i < values.length; i++) {
- datums[i] = get(i);
- }
- return datums;
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- LazyTuple lazyTuple = (LazyTuple) super.clone();
-
- lazyTuple.values = getValues(); //shallow copy
- lazyTuple.textBytes = new byte[size()][];
- return lazyTuple;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Tuple) {
- Tuple other = (Tuple) obj;
- return Arrays.equals(getValues(), other.getValues());
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
deleted file mode 100644
index 66c610a..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * A class that provides a line reader from an input stream.
- * Depending on the constructor used, lines will either be terminated by:
- * <ul>
- * <li>one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF).</li>
- * <li><em>or</em>, a custom byte sequence delimiter</li>
- * </ul>
- * In both cases, EOF also terminates an otherwise unterminated
- * line.
- */
-
-public class LineReader implements Closeable {
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
-
- private static final byte CR = '\r';
- private static final byte LF = '\n';
-
- // The line delimiter
- private final byte[] recordDelimiterBytes;
-
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size (64k).
- *
- * @param in The input stream
- * @throws IOException
- */
- public LineReader(InputStream in) {
- this(in, DEFAULT_BUFFER_SIZE);
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size.
- *
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize) {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = null;
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * <code>io.file.buffer.size</code> specified in the given
- * <code>Configuration</code>.
- *
- * @param in input stream
- * @param conf configuration
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf) throws IOException {
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size, and using a custom delimiter of array of
- * bytes.
- *
- * @param in The input stream
- * @param recordDelimiterBytes The delimiter
- */
- public LineReader(InputStream in, byte[] recordDelimiterBytes) {
- this.in = in;
- this.bufferSize = DEFAULT_BUFFER_SIZE;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size, and using a custom delimiter of array of
- * bytes.
- *
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @param recordDelimiterBytes The delimiter
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize,
- byte[] recordDelimiterBytes) {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * <code>io.file.buffer.size</code> specified in the given
- * <code>Configuration</code>, and using a custom delimiter of array of
- * bytes.
- *
- * @param in input stream
- * @param conf configuration
- * @param recordDelimiterBytes The delimiter
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf,
- byte[] recordDelimiterBytes) throws IOException {
- this.in = in;
- this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
-
-
- /**
- * Close the underlying stream.
- *
- * @throws IOException
- */
- public void close() throws IOException {
- in.close();
- }
-
- public void reset() {
- bufferLength = 0;
- bufferPosn = 0;
-
- }
-
- /**
- * Read one line from the InputStream into the given Text.
- *
- * @param str the object to store the given line (without newline)
- * @param maxLineLength the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- * @return the number of bytes read including the (longest) newline
- * found.
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume) throws IOException {
- if (this.recordDelimiterBytes != null) {
- return readCustomLine(str, maxLineLength, maxBytesToConsume);
- } else {
- return readDefaultLine(str, maxLineLength, maxBytesToConsume);
- }
- }
-
- protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
- throws IOException {
- return in.read(buffer);
- }
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
- private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = fillBuffer(in, buffer, prevCharCR);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
- if (bytesConsumed > (long) Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- }
- return (int) bytesConsumed;
- }
-
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
- public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
- , int maxBytesToConsume)
- throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
-
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = fillBuffer(in, buffer, prevCharCR);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.write(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
- if (bytesConsumed > (long) Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- }
-
- if (bytesConsumed > 0) offsets.add(txtLength);
- return (int) bytesConsumed;
- }
-
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
-
-/* int validIdx = 0;
- public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
- long pos, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- *//* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- *//*
- //str.clear();
- str.reset();
- offsets.clear();
- foffsets.clear();
-
- validIdx = 0;
- long bufferBytesConsumed = 0;
-
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
-
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = in.read(buffer);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
-
- if (appendLength > 0) {
- str.write(buffer, startPosn, appendLength);
- //System.out.println(startPosn + "," + appendLength);
- //str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
-
- if(newlineLength > 0){
- validIdx++;
-
- if (bytesConsumed > (long)Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- }
- offsets.add(txtLength);
- foffsets.add(pos);
- pos+= bytesConsumed;
- bufferBytesConsumed += bytesConsumed;
-
- txtLength = 0;
- newlineLength = 0;
- prevCharCR = false; //true of prev char was CR
- bytesConsumed = 0;
- } else {
- bufferBytesConsumed += bytesConsumed;
- bytesConsumed = 0;
- }
- } while ((bufferBytesConsumed < 256 * 1024));
-
- return (int)bufferBytesConsumed;
- }*/
-
- /**
- * Read a line terminated by a custom delimiter.
- */
- private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- /* We're reading data from inputStream, but the head of the stream may be
- * already captured in the previous buffer, so we have several cases:
- *
- * 1. The buffer tail does not contain any character sequence which
- * matches with the head of delimiter. We count it as a
- * ambiguous byte count = 0
- *
- * 2. The buffer tail contains a X number of characters,
- * that forms a sequence, which matches with the
- * head of delimiter. We count ambiguous byte count = X
- *
- * // *** eg: A segment of input file is as follows
- *
- * " record 1792: I found this bug very interesting and
- * I have completely read about it. record 1793: This bug
- * can be solved easily record 1794: This ."
- *
- * delimiter = "record";
- *
- * supposing:- String at the end of buffer =
- * "I found this bug very interesting and I have completely re"
- * There for next buffer = "ad about it. record 179 ...."
- *
- * The matching characters in the input
- * buffer tail and delimiter head = "re"
- * Therefore, ambiguous byte count = 2 **** //
- *
- * 2.1 If the following bytes are the remaining characters of
- * the delimiter, then we have to capture only up to the starting
- * position of delimiter. That means, we need not include the
- * ambiguous characters in str.
- *
- * 2.2 If the following bytes are not the remaining characters of
- * the delimiter ( as mentioned in the example ),
- * then we have to include the ambiguous characters in str.
- */
- str.clear();
- int txtLength = 0; // tracks str.getLength(), as an optimization
- long bytesConsumed = 0;
- int delPosn = 0;
- int ambiguousByteCount = 0; // To capture the ambiguous characters count
- do {
- int startPosn = bufferPosn; // Start from previous end position
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
- if (bufferLength <= 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
- delPosn++;
- if (delPosn >= recordDelimiterBytes.length) {
- bufferPosn++;
- break;
- }
- } else if (delPosn != 0) {
- bufferPosn--;
- delPosn = 0;
- }
- }
- int readLength = bufferPosn - startPosn;
- bytesConsumed += readLength;
- int appendLength = readLength - delPosn;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- if (ambiguousByteCount > 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- //appending the ambiguous characters (refer case 2.2)
- bytesConsumed += ambiguousByteCount;
- ambiguousByteCount = 0;
- }
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- if (bufferPosn >= bufferLength) {
- if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
- ambiguousByteCount = delPosn;
- bytesConsumed -= ambiguousByteCount; //to be consumed in next
- }
- }
- } while (delPosn < recordDelimiterBytes.length
- && bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long) Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
- }
- return (int) bytesConsumed;
- }
-
- /**
- * Read from the InputStream into the given Text.
- *
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength) throws IOException {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
-
- /**
- * Read from the InputStream into the given Text.
- *
- * @param str the object to store the given line
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str) throws IOException {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
deleted file mode 100644
index e4439f3..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-public class MergeScanner implements Scanner {
- private Configuration conf;
- private TableMeta meta;
- private Schema schema;
- private List<FileFragment> fragments;
- private Iterator<FileFragment> iterator;
- private FileFragment currentFragment;
- private Scanner currentScanner;
- private Tuple tuple;
- private boolean projectable = false;
- private boolean selectable = false;
- private Schema target;
-
- public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
- throws IOException {
- this(conf, schema, meta, rawFragmentList, schema);
- }
-
- public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList,
- Schema target)
- throws IOException {
- this.conf = conf;
- this.schema = schema;
- this.meta = meta;
- this.fragments = Lists.newArrayList();
- for (Fragment f : rawFragmentList) {
- fragments.add((FileFragment) f);
- }
- Collections.sort(fragments);
-
- this.target = target;
- this.reset();
- if (currentScanner != null) {
- this.projectable = currentScanner.isProjectable();
- this.selectable = currentScanner.isSelectable();
- }
- }
-
- @Override
- public void init() throws IOException {
- }
-
- @Override
- public Tuple next() throws IOException {
- if (currentScanner != null)
- tuple = currentScanner.next();
-
- if (tuple != null) {
- return tuple;
- } else {
- if (currentScanner != null) {
- currentScanner.close();
- }
- currentScanner = getNextScanner();
- if (currentScanner != null) {
- tuple = currentScanner.next();
- }
- }
- return tuple;
- }
-
- @Override
- public void reset() throws IOException {
- this.iterator = fragments.iterator();
- this.currentScanner = getNextScanner();
- }
-
- private Scanner getNextScanner() throws IOException {
- if (iterator.hasNext()) {
- currentFragment = iterator.next();
- currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, schema,
- currentFragment, target);
- currentScanner.init();
- return currentScanner;
- } else {
- return null;
- }
- }
-
- @Override
- public void close() throws IOException {
- if(currentScanner != null) {
- currentScanner.close();
- }
- iterator = null;
- if(fragments != null) {
- fragments.clear();
- }
- }
-
- @Override
- public boolean isProjectable() {
- return projectable;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- this.target = new Schema(targets);
- }
-
- @Override
- public boolean isSelectable() {
- return selectable;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public boolean isSplittable(){
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
deleted file mode 100644
index 94d13ee..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-
-import java.util.Comparator;
-
-public class NumericPathComparator implements Comparator<Path> {
-
- @Override
- public int compare(Path p1, Path p2) {
- int num1 = Integer.parseInt(p1.getName());
- int num2 = Integer.parseInt(p2.getName());
-
- return num1 - num2;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
deleted file mode 100644
index db511dc..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.datum.TimestampDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BitArray;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
-
-public class RawFile {
- private static final Log LOG = LogFactory.getLog(RawFile.class);
-
- public static class RawFileScanner extends FileScanner implements SeekableScanner {
- private FileChannel channel;
- private DataType[] columnTypes;
- private Path path;
-
- private ByteBuffer buffer;
- private Tuple tuple;
-
- private int headerSize = 0;
- private BitArray nullFlags;
- private static final int RECORD_SIZE = 4;
- private boolean eof = false;
- private long fileSize;
- private FileInputStream fis;
-
- public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
- super(conf, schema, meta, null);
- this.path = path;
- init();
- }
-
- @SuppressWarnings("unused")
- public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
- this(conf, schema, meta, fragment.getPath());
- }
-
- public void init() throws IOException {
- //Preconditions.checkArgument(FileUtil.isLocalPath(path));
- // TODO - to make it unified one.
- URI uri = path.toUri();
- fis = new FileInputStream(new File(uri));
- channel = fis.getChannel();
- fileSize = channel.size();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
- }
-
- buffer = ByteBuffer.allocateDirect(128 * 1024);
-
- columnTypes = new DataType[schema.getColumnNum()];
- for (int i = 0; i < schema.getColumnNum(); i++) {
- columnTypes[i] = schema.getColumn(i).getDataType();
- }
-
- tuple = new VTuple(columnTypes.length);
-
- // initial read
- channel.read(buffer);
- buffer.flip();
-
- nullFlags = new BitArray(schema.getColumnNum());
- headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
- super.init();
- }
-
- @Override
- public long getNextOffset() throws IOException {
- return channel.position() - buffer.remaining();
- }
-
- @Override
- public void seek(long offset) throws IOException {
- long currentPos = channel.position();
- if(currentPos < offset && offset < currentPos + buffer.limit()){
- buffer.position((int)(offset - currentPos));
- } else {
- buffer.clear();
- channel.position(offset);
- channel.read(buffer);
- buffer.flip();
- eof = false;
- }
- }
-
- private boolean fillBuffer() throws IOException {
- buffer.compact();
- if (channel.read(buffer) == -1) {
- eof = true;
- return false;
- } else {
- buffer.flip();
- return true;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- if(eof) return null;
-
- if (buffer.remaining() < headerSize) {
- if (!fillBuffer()) {
- return null;
- }
- }
-
- // backup the buffer state
- int bufferLimit = buffer.limit();
- int recordSize = buffer.getInt();
- int nullFlagSize = buffer.getShort();
-
- buffer.limit(buffer.position() + nullFlagSize);
- nullFlags.fromByteBuffer(buffer);
- // restore the start of record contents
- buffer.limit(bufferLimit);
- //buffer.position(recordOffset + headerSize);
- if (buffer.remaining() < (recordSize - headerSize)) {
- if (!fillBuffer()) {
- return null;
- }
- }
-
- for (int i = 0; i < columnTypes.length; i++) {
- // check if the i'th column is null
- if (nullFlags.get(i)) {
- tuple.put(i, DatumFactory.createNullDatum());
- continue;
- }
-
- switch (columnTypes[i].getType()) {
- case BOOLEAN :
- tuple.put(i, DatumFactory.createBool(buffer.get()));
- break;
-
- case BIT :
- tuple.put(i, DatumFactory.createBit(buffer.get()));
- break;
-
- case CHAR :
- int realLen = buffer.getInt();
- byte[] buf = new byte[columnTypes[i].getLength()];
- buffer.get(buf);
- byte[] charBuf = Arrays.copyOf(buf, realLen);
- tuple.put(i, DatumFactory.createChar(charBuf));
- break;
-
- case INT2 :
- tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
- break;
-
- case INT4 :
- tuple.put(i, DatumFactory.createInt4(buffer.getInt()));
- break;
-
- case INT8 :
- tuple.put(i, DatumFactory.createInt8(buffer.getLong()));
- break;
-
- case FLOAT4 :
- tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
- break;
-
- case FLOAT8 :
- tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
- break;
-
- case TEXT :
- // TODO - shoud use CharsetEncoder / CharsetDecoder
- //byte [] rawBytes = getColumnBytes();
- int strSize2 = buffer.getInt();
- byte [] strBytes2 = new byte[strSize2];
- buffer.get(strBytes2);
- tuple.put(i, DatumFactory.createText(new String(strBytes2)));
- break;
-
- case TIMESTAMP:
- tuple.put(i, DatumFactory.createTimeStampFromMillis(buffer.getLong()));
- break;
-
- case BLOB : {
- //byte [] rawBytes = getColumnBytes();
- int byteSize = buffer.getInt();
- byte [] rawBytes = new byte[byteSize];
- buffer.get(rawBytes);
- tuple.put(i, DatumFactory.createBlob(rawBytes));
- break;
- }
-
- case PROTOBUF: {
- //byte [] rawBytes = getColumnBytes();
- int byteSize = buffer.getInt();
- byte [] rawBytes = new byte[byteSize];
- buffer.get(rawBytes);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(rawBytes);
- tuple.put(i, factory.createDatum(builder.build()));
- break;
- }
-
- case INET4 :
- byte [] ipv4Bytes = new byte[4];
- buffer.get(ipv4Bytes);
- tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
- break;
-
- case NULL_TYPE:
- tuple.put(i, NullDatum.get());
- break;
-
- default:
- }
- }
-
- if(!buffer.hasRemaining() && channel.position() == fileSize){
- eof = true;
- }
- return tuple;
- }
-
- @Override
- public void reset() throws IOException {
- // clear the buffer
- buffer.clear();
- // reload initial buffer
- channel.position(0);
- channel.read(buffer);
- buffer.flip();
- eof = false;
- }
-
- @Override
- public void close() throws IOException {
- buffer.clear();
- channel.close();
- fis.close();
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public boolean isSplittable(){
- return false;
- }
- }
-
- public static class RawFileAppender extends FileAppender {
- private FileChannel channel;
- private RandomAccessFile randomAccessFile;
- private DataType[] columnTypes;
-
- private ByteBuffer buffer;
- private BitArray nullFlags;
- private int headerSize = 0;
- private static final int RECORD_SIZE = 4;
- private long pos;
-
- private TableStatistics stats;
-
- public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
- super(conf, schema, meta, path);
- }
-
- public void init() throws IOException {
- // TODO - RawFile only works on Local File System.
- //Preconditions.checkArgument(FileUtil.isLocalPath(path));
- File file = new File(path.toUri());
- randomAccessFile = new RandomAccessFile(file, "rw");
- channel = randomAccessFile.getChannel();
- pos = 0;
-
- columnTypes = new DataType[schema.getColumnNum()];
- for (int i = 0; i < schema.getColumnNum(); i++) {
- columnTypes[i] = schema.getColumn(i).getDataType();
- }
-
- buffer = ByteBuffer.allocateDirect(64 * 1024);
-
- // comput the number of bytes, representing the null flags
-
- nullFlags = new BitArray(schema.getColumnNum());
- headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
-
- super.init();
- }
-
- @Override
- public long getOffset() throws IOException {
- return pos;
- }
-
- private void flushBuffer() throws IOException {
- buffer.limit(buffer.position());
- buffer.flip();
- channel.write(buffer);
- buffer.clear();
- }
-
- private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
- throws IOException {
-
- // if the buffer reaches the limit,
- // write the bytes from 0 to the previous record.
- if (buffer.remaining() < sizeToBeWritten) {
-
- int limit = buffer.position();
- buffer.limit(recordOffset);
- buffer.flip();
- channel.write(buffer);
- buffer.position(recordOffset);
- buffer.limit(limit);
- buffer.compact();
-
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
-
- if (buffer.remaining() < headerSize) {
- flushBuffer();
- }
-
- // skip the row header
- int recordOffset = buffer.position();
- buffer.position(recordOffset + headerSize);
- // reset the null flags
- nullFlags.clear();
- for (int i = 0; i < schema.getColumnNum(); i++) {
- if (enabledStats) {
- stats.analyzeField(i, t.get(i));
- }
-
- if (t.isNull(i)) {
- nullFlags.set(i);
- continue;
- }
-
- // 8 is the maximum bytes size of all types
- if (flushBufferAndReplace(recordOffset, 8)) {
- recordOffset = 0;
- }
-
- switch(columnTypes[i].getType()) {
- case NULL_TYPE:
- nullFlags.set(i);
- continue;
-
- case BOOLEAN:
- case BIT:
- buffer.put(t.get(i).asByte());
- break;
-
- case CHAR :
- byte[] src = t.getChar(i).asByteArray();
- byte[] dst = Arrays.copyOf(src, columnTypes[i].getLength());
- buffer.putInt(src.length);
- buffer.put(dst);
- break;
-
- case INT2 :
- buffer.putShort(t.get(i).asInt2());
- break;
-
- case INT4 :
- buffer.putInt(t.get(i).asInt4());
- break;
-
- case INT8 :
- buffer.putLong(t.get(i).asInt8());
- break;
-
- case FLOAT4 :
- buffer.putFloat(t.get(i).asFloat4());
- break;
-
- case FLOAT8 :
- buffer.putDouble(t.get(i).asFloat8());
- break;
-
- case TEXT:
- byte [] strBytes2 = t.get(i).asByteArray();
- if (flushBufferAndReplace(recordOffset, strBytes2.length + 4)) {
- recordOffset = 0;
- }
- buffer.putInt(strBytes2.length);
- buffer.put(strBytes2);
- break;
-
- case TIMESTAMP:
- buffer.putLong(((TimestampDatum)t.get(i)).getMillis());
- break;
-
- case BLOB : {
- byte [] rawBytes = t.get(i).asByteArray();
- if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
- recordOffset = 0;
- }
- buffer.putInt(rawBytes.length);
- buffer.put(rawBytes);
- break;
- }
-
- case PROTOBUF: {
- // TODO - to be fixed
-// byte [] lengthByte = new byte[4];
-// byte [] byteArray = t.get(i).asByteArray();
-// CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
-// outputStream.writeUInt32NoTag(byteArray.length);
-// outputStream.flush();
-// int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
-// if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength)) {
-// recordOffset = 0;
-// }
-// buffer.put(lengthByte, 0, legnthByteLength);
- byte [] rawBytes = t.get(i).asByteArray();
- if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
- recordOffset = 0;
- }
- buffer.putInt(rawBytes.length);
- buffer.put(rawBytes);
- break;
- }
-
- case INET4 :
- buffer.put(t.get(i).asByteArray());
- break;
-
- default:
- throw new IOException("Cannot support data type: " + columnTypes[i].getType());
- }
- }
-
- // write a record header
- int bufferPos = buffer.position();
- buffer.position(recordOffset);
- buffer.putInt(bufferPos - recordOffset);
- byte [] flags = nullFlags.toArray();
- buffer.putShort((short) flags.length);
- buffer.put(flags);
-
- pos += bufferPos - recordOffset;
- buffer.position(bufferPos);
-
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- @Override
- public void flush() throws IOException {
- flushBuffer();
- }
-
- @Override
- public void close() throws IOException {
- flush();
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
- }
- channel.close();
- randomAccessFile.close();
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
deleted file mode 100644
index 1e89f31..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BitArray;
-import org.apache.tajo.util.Bytes;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-
-public class RowFile {
- public static final Log LOG = LogFactory.getLog(RowFile.class);
-
- private static final int SYNC_ESCAPE = -1;
- private static final int SYNC_HASH_SIZE = 16;
- private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
- private final static int DEFAULT_BUFFER_SIZE = 65535;
- public static int SYNC_INTERVAL;
-
- public static class RowFileScanner extends FileScanner {
- private FileSystem fs;
- private FSDataInputStream in;
- private Tuple tuple;
-
- private byte[] sync = new byte[SYNC_HASH_SIZE];
- private byte[] checkSync = new byte[SYNC_HASH_SIZE];
- private long start, end;
-
- private ByteBuffer buffer;
- private final int tupleHeaderSize;
- private BitArray nullFlags;
- private long bufferStartPos;
-
- public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
-
- SYNC_INTERVAL =
- conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname,
- SYNC_SIZE * 100);
-
- nullFlags = new BitArray(schema.getColumnNum());
- tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
- this.start = fragment.getStartKey();
- this.end = this.start + fragment.getEndKey();
- }
-
- public void init() throws IOException {
- // set default page size.
- fs = fragment.getPath().getFileSystem(conf);
- in = fs.open(fragment.getPath());
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.getColumnNum());
- buffer.flip();
-
- readHeader();
-
- // find the correct position from the start
- if (this.start > in.getPos()) {
- long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
- in.seek(realStart);
- }
- bufferStartPos = in.getPos();
- fillBuffer();
-
- if (start != 0) {
- // TODO: improve
- boolean syncFound = false;
- while (!syncFound) {
- if (buffer.remaining() < SYNC_SIZE) {
- fillBuffer();
- }
- buffer.mark();
- syncFound = checkSync();
- if (!syncFound) {
- buffer.reset();
- buffer.get(); // proceed one byte
- }
- }
- bufferStartPos += buffer.position();
- buffer.compact();
- buffer.flip();
- }
-
- super.init();
- }
-
- private void readHeader() throws IOException {
- SYNC_INTERVAL = in.readInt();
- Bytes.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
- }
-
- /**
- * Find the sync from the front of the buffer
- *
- * @return return true if it succeeds to find the sync.
- * @throws IOException
- */
- private boolean checkSync() throws IOException {
- buffer.getInt(); // escape
- buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync
- return Arrays.equals(checkSync, sync);
- }
-
- private int fillBuffer() throws IOException {
- bufferStartPos += buffer.position();
- buffer.compact();
- int remain = buffer.remaining();
- int read = in.read(buffer);
- if (read == -1) {
- buffer.flip();
- return read;
- } else {
- int totalRead = read;
- if (remain > totalRead) {
- read = in.read(buffer);
- totalRead += read > 0 ? read : 0;
- }
- buffer.flip();
- return totalRead;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- while (buffer.remaining() < SYNC_SIZE) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- buffer.mark();
- if (!checkSync()) {
- buffer.reset();
- } else {
- if (bufferStartPos + buffer.position() > end) {
- return null;
- }
- }
-
- while (buffer.remaining() < tupleHeaderSize) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- int i;
- tuple = new VTuple(schema.getColumnNum());
-
- int nullFlagSize = buffer.getShort();
- byte[] nullFlagBytes = new byte[nullFlagSize];
- buffer.get(nullFlagBytes, 0, nullFlagSize);
- nullFlags = new BitArray(nullFlagBytes);
- int tupleSize = buffer.getShort();
-
- while (buffer.remaining() < (tupleSize)) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- Datum datum;
- Column col;
- for (i = 0; i < schema.getColumnNum(); i++) {
- if (!nullFlags.get(i)) {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN :
- datum = DatumFactory.createBool(buffer.get());
- tuple.put(i, datum);
- break;
-
- case BIT:
- datum = DatumFactory.createBit(buffer.get());
- tuple.put(i, datum );
- break;
-
- case CHAR :
- int realLen = buffer.getInt();
- byte[] buf = new byte[col.getDataType().getLength()];
- buffer.get(buf);
- byte[] charBuf = Arrays.copyOf(buf, realLen);
- tuple.put(i, DatumFactory.createChar(charBuf));
- break;
-
- case INT2 :
- datum = DatumFactory.createInt2(buffer.getShort());
- tuple.put(i, datum );
- break;
-
- case INT4 :
- datum = DatumFactory.createInt4(buffer.getInt());
- tuple.put(i, datum );
- break;
-
- case INT8 :
- datum = DatumFactory.createInt8(buffer.getLong());
- tuple.put(i, datum );
- break;
-
- case FLOAT4 :
- datum = DatumFactory.createFloat4(buffer.getFloat());
- tuple.put(i, datum);
- break;
-
- case FLOAT8 :
- datum = DatumFactory.createFloat8(buffer.getDouble());
- tuple.put(i, datum);
- break;
-
-// case TEXT :
-// short len = buffer.getShort();
-// byte[] buf = new byte[len];
-// buffer.get(buf, 0, len);
-// datum = DatumFactory.createText(buf);
-// tuple.put(i, datum);
-// break;
-
- case TEXT:
- short bytelen = buffer.getShort();
- byte[] strbytes = new byte[bytelen];
- buffer.get(strbytes, 0, bytelen);
- datum = DatumFactory.createText(strbytes);
- tuple.put(i, datum);
- break;
-
- case BLOB:
- short bytesLen = buffer.getShort();
- byte [] bytesBuf = new byte[bytesLen];
- buffer.get(bytesBuf);
- datum = DatumFactory.createBlob(bytesBuf);
- tuple.put(i, datum);
- break;
-
- case INET4 :
- byte[] ipv4 = new byte[4];
- buffer.get(ipv4, 0, 4);
- datum = DatumFactory.createInet4(ipv4);
- tuple.put(i, datum);
- break;
-
- default:
- break;
- }
- } else {
- tuple.put(i, DatumFactory.createNullDatum());
- }
- }
- return tuple;
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public boolean isSplittable(){
- return true;
- }
- }
-
- public static class RowFileAppender extends FileAppender {
- private FSDataOutputStream out;
- private long lastSyncPos;
- private FileSystem fs;
- private byte[] sync;
- private ByteBuffer buffer;
-
- private BitArray nullFlags;
- // statistics
- private TableStatistics stats;
-
- public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
- throws IOException {
- super(conf, schema, meta, path);
- }
-
- public void init() throws IOException {
- SYNC_INTERVAL = conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
-
- fs = path.getFileSystem(conf);
-
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
-
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
-
- sync = new byte[SYNC_HASH_SIZE];
- lastSyncPos = 0;
-
- out = fs.create(path);
-
- MessageDigest md;
- try {
- md = MessageDigest.getInstance("MD5");
- md.update((path.toString()+System.currentTimeMillis()).getBytes());
- sync = md.digest();
- } catch (NoSuchAlgorithmException e) {
- LOG.error(e);
- }
-
- writeHeader();
-
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-
- nullFlags = new BitArray(schema.getColumnNum());
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
- }
-
- private void writeHeader() throws IOException {
- out.writeInt(SYNC_INTERVAL);
- out.write(sync);
- out.flush();
- lastSyncPos = out.getPos();
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
- checkAndWriteSync();
- Column col;
-
- buffer.clear();
- nullFlags.clear();
-
- for (int i = 0; i < schema.getColumnNum(); i++) {
- if (enabledStats) {
- stats.analyzeField(i, t.get(i));
- }
-
- if (t.isNull(i)) {
- nullFlags.set(i);
- } else {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- buffer.put(t.getBoolean(i).asByte());
- break;
- case BIT:
- buffer.put(t.getByte(i).asByte());
- break;
- case CHAR:
- byte[] src = t.getChar(i).asByteArray();
- byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
- buffer.putInt(src.length);
- buffer.put(dst);
- break;
- case TEXT:
- byte [] strbytes = t.getText(i).asByteArray();
- buffer.putShort((short)strbytes.length);
- buffer.put(strbytes, 0, strbytes.length);
- break;
- case INT2:
- buffer.putShort(t.getShort(i).asInt2());
- break;
- case INT4:
- buffer.putInt(t.getInt(i).asInt4());
- break;
- case INT8:
- buffer.putLong(t.getLong(i).asInt8());
- break;
- case FLOAT4:
- buffer.putFloat(t.getFloat(i).asFloat4());
- break;
- case FLOAT8:
- buffer.putDouble(t.getDouble(i).asFloat8());
- break;
- case BLOB:
- byte [] bytes = t.getBytes(i).asByteArray();
- buffer.putShort((short)bytes.length);
- buffer.put(bytes);
- break;
- case INET4:
- buffer.put(t.getIPv4Bytes(i));
- break;
- case INET6:
- buffer.put(t.getIPv6Bytes(i));
- break;
- case NULL_TYPE:
- nullFlags.set(i);
- break;
- default:
- break;
- }
- }
- }
-
- byte[] bytes = nullFlags.toArray();
- out.writeShort(bytes.length);
- out.write(bytes);
-
- bytes = buffer.array();
- int dataLen = buffer.position();
- out.writeShort(dataLen);
- out.write(bytes, 0, dataLen);
-
- // Statistical section
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- @Override
- public long getOffset() throws IOException {
- return out.getPos();
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- if (out != null) {
- if (enabledStats) {
- stats.setNumBytes(out.getPos());
- }
- sync();
- out.flush();
- out.close();
- }
- }
-
- private void sync() throws IOException {
- if (lastSyncPos != out.getPos()) {
- out.writeInt(SYNC_ESCAPE);
- out.write(sync);
- lastSyncPos = out.getPos();
- }
- }
-
- private void checkAndWriteSync() throws IOException {
- if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
- sync();
- }
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
deleted file mode 100644
index 9f32028..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.TimestampDatum;
-import org.apache.tajo.util.Bytes;
-
-import java.nio.ByteBuffer;
-
-public class RowStoreUtil {
- public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
- int[] targetIds = new int[outSchema.getColumnNum()];
- int i = 0;
- for (Column target : outSchema.getColumns()) {
- targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
- i++;
- }
-
- return targetIds;
- }
-
- public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
- out.clear();
- for (int idx = 0; idx < targetIds.length; idx++) {
- out.put(idx, in.get(targetIds[idx]));
- }
- return out;
- }
-
- public static class RowStoreDecoder {
-
- public static Tuple toTuple(Schema schema, byte [] bytes) {
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- Tuple tuple = new VTuple(schema.getColumnNum());
- Column col;
- TajoDataTypes.DataType type;
- for (int i =0; i < schema.getColumnNum(); i++) {
- col = schema.getColumn(i);
- type = col.getDataType();
- switch (type.getType()) {
- case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
- case BIT:
- byte b = bb.get();
- if(b == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createBit(b));
- }
- break;
-
- case CHAR:
- byte c = bb.get();
- if(c == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createChar(c));
- }
- break;
-
- case INT2:
- short s = bb.getShort();
- if(s < Short.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createInt2(s));
- }
- break;
-
- case INT4:
- case DATE:
- int i_ = bb.getInt();
- if ( i_ < Integer.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createFromInt4(type, i_));
- }
- break;
-
- case INT8:
- case TIME:
- case TIMESTAMP:
- long l = bb.getLong();
- if ( l < Long.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createFromInt8(type, l));
- }
- break;
-
- case FLOAT4:
- float f = bb.getFloat();
- if (Float.isNaN(f)) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createFloat4(f));
- }
- break;
-
- case FLOAT8:
- double d = bb.getDouble();
- if(Double.isNaN(d)) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createFloat8(d));
- }
- break;
-
- case TEXT:
- byte [] _string = new byte[bb.getInt()];
- bb.get(_string);
- String str = new String(_string);
- if(str.compareTo("NULL") == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createText(str));
- }
- break;
-
- case BLOB:
- byte [] _bytes = new byte[bb.getInt()];
- bb.get(_bytes);
- if(Bytes.compareTo(bytes, Bytes.toBytes("NULL")) == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createBlob(_bytes));
- }
- break;
-
- case INET4:
- byte [] _ipv4 = new byte[4];
- bb.get(_ipv4);
- tuple.put(i, DatumFactory.createInet4(_ipv4));
- break;
- case INET6:
- // TODO - to be implemented
- }
- }
- return tuple;
- }
- }
-
- public static class RowStoreEncoder {
-
- public static byte [] toBytes(Schema schema, Tuple tuple) {
- int size = StorageUtil.getRowByteSize(schema);
- ByteBuffer bb = ByteBuffer.allocate(size);
- Column col;
- for (int i = 0; i < schema.getColumnNum(); i++) {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
- case BIT: bb.put(tuple.get(i).asByte()); break;
- case CHAR: bb.put(tuple.get(i).asByte()); break;
- case INT2: bb.putShort(tuple.get(i).asInt2()); break;
- case INT4: bb.putInt(tuple.get(i).asInt4()); break;
- case INT8: bb.putLong(tuple.get(i).asInt8()); break;
- case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
- case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
- case TEXT:
- byte [] _string = tuple.get(i).asByteArray();
- bb.putInt(_string.length);
- bb.put(_string);
- break;
- case DATE: bb.putInt(tuple.get(i).asInt4()); break;
- case TIMESTAMP: bb.putLong(((TimestampDatum)tuple.get(i)).getMillis()); break;
- case BLOB:
- byte [] bytes = tuple.get(i).asByteArray();
- bb.putInt(bytes.length);
- bb.put(bytes);
- break;
- case INET4:
- byte [] ipBytes = tuple.getIPv4Bytes(i);
- bb.put(ipBytes);
- break;
- case INET6: bb.put(tuple.getIPv6Bytes(i)); break;
- default:
- }
- }
-
- bb.flip();
- byte [] buf = new byte [bb.limit()];
- bb.get(buf);
- return buf;
- }
- }
-}