You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/18 01:54:34 UTC
[1/2] tajo git commit: TAJO-2146: Fragment interface cleanup.
Repository: tajo
Updated Branches:
refs/heads/master 3de377461 -> 1c44272bf
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
deleted file mode 100644
index 958e1c6..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.BitArray;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-
-public class RowFile {
- public static final Log LOG = LogFactory.getLog(RowFile.class);
-
- private static final int SYNC_ESCAPE = -1;
- private static final int SYNC_HASH_SIZE = 16;
- private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
- private final static int DEFAULT_BUFFER_SIZE = 65535;
- public static int SYNC_INTERVAL;
-
- public static class RowFileScanner extends FileScanner {
- private FileSystem fs;
- private FSDataInputStream in;
- private Tuple tuple;
-
- private byte[] sync = new byte[SYNC_HASH_SIZE];
- private byte[] checkSync = new byte[SYNC_HASH_SIZE];
- private long start, end;
-
- private ByteBuffer buffer;
- private final int tupleHeaderSize;
- private BitArray nullFlags;
- private long bufferStartPos;
-
- public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
-
- SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
- ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
-
- nullFlags = new BitArray(schema.size());
- tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
- this.start = this.fragment.getStartKey();
- this.end = this.start + this.fragment.getLength();
- }
-
- public void init() throws IOException {
- // set default page size.
- fs = fragment.getPath().getFileSystem(conf);
- in = fs.open(fragment.getPath());
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
- buffer.flip();
-
- readHeader();
-
- // find the correct position from the start
- if (this.start > in.getPos()) {
- long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
- in.seek(realStart);
- }
- bufferStartPos = in.getPos();
- fillBuffer();
-
- if (start != 0) {
- // TODO: improve
- boolean syncFound = false;
- while (!syncFound) {
- if (buffer.remaining() < SYNC_SIZE) {
- fillBuffer();
- }
- buffer.mark();
- syncFound = checkSync();
- if (!syncFound) {
- buffer.reset();
- buffer.get(); // proceed one byte
- }
- }
- bufferStartPos += buffer.position();
- buffer.compact();
- buffer.flip();
- }
-
- tuple = new VTuple(schema.size());
-
- super.init();
- }
-
- private void readHeader() throws IOException {
- SYNC_INTERVAL = in.readInt();
- StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
- }
-
- /**
- * Find the sync from the front of the buffer
- *
- * @return return true if it succeeds to find the sync.
- * @throws java.io.IOException
- */
- private boolean checkSync() throws IOException {
- buffer.getInt(); // escape
- buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync
- return Arrays.equals(checkSync, sync);
- }
-
- private int fillBuffer() throws IOException {
- bufferStartPos += buffer.position();
- buffer.compact();
- int remain = buffer.remaining();
- int read = in.read(buffer);
- if (read == -1) {
- buffer.flip();
- return read;
- } else {
- int totalRead = read;
- if (remain > totalRead) {
- read = in.read(buffer);
- totalRead += read > 0 ? read : 0;
- }
- buffer.flip();
- return totalRead;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- while (buffer.remaining() < SYNC_SIZE) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- buffer.mark();
- if (!checkSync()) {
- buffer.reset();
- } else {
- if (bufferStartPos + buffer.position() > end) {
- return null;
- }
- }
-
- while (buffer.remaining() < tupleHeaderSize) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- int i;
-
- int nullFlagSize = buffer.getShort();
- byte[] nullFlagBytes = new byte[nullFlagSize];
- buffer.get(nullFlagBytes, 0, nullFlagSize);
- nullFlags = new BitArray(nullFlagBytes);
- int tupleSize = buffer.getShort();
-
- while (buffer.remaining() < (tupleSize)) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- Datum datum;
- Column col;
- for (i = 0; i < schema.size(); i++) {
- if (!nullFlags.get(i)) {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN :
- datum = DatumFactory.createBool(buffer.get());
- tuple.put(i, datum);
- break;
-
- case BIT:
- datum = DatumFactory.createBit(buffer.get());
- tuple.put(i, datum );
- break;
-
- case CHAR :
- int realLen = buffer.getInt();
- byte[] buf = new byte[col.getDataType().getLength()];
- buffer.get(buf);
- byte[] charBuf = Arrays.copyOf(buf, realLen);
- tuple.put(i, DatumFactory.createChar(charBuf));
- break;
-
- case INT2 :
- datum = DatumFactory.createInt2(buffer.getShort());
- tuple.put(i, datum );
- break;
-
- case INT4 :
- datum = DatumFactory.createInt4(buffer.getInt());
- tuple.put(i, datum );
- break;
-
- case INT8 :
- datum = DatumFactory.createInt8(buffer.getLong());
- tuple.put(i, datum );
- break;
-
- case FLOAT4 :
- datum = DatumFactory.createFloat4(buffer.getFloat());
- tuple.put(i, datum);
- break;
-
- case FLOAT8 :
- datum = DatumFactory.createFloat8(buffer.getDouble());
- tuple.put(i, datum);
- break;
-
- case TEXT:
- short bytelen = buffer.getShort();
- byte[] strbytes = new byte[bytelen];
- buffer.get(strbytes, 0, bytelen);
- datum = DatumFactory.createText(strbytes);
- tuple.put(i, datum);
- break;
-
- case BLOB:
- short bytesLen = buffer.getShort();
- byte [] bytesBuf = new byte[bytesLen];
- buffer.get(bytesBuf);
- datum = DatumFactory.createBlob(bytesBuf);
- tuple.put(i, datum);
- break;
-
- default:
- break;
- }
- } else {
- tuple.put(i, DatumFactory.createNullDatum());
- }
- }
- return tuple;
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setFilter(EvalNode filter) {
- throw new TajoRuntimeException(new UnsupportedException());
- }
-
- @Override
- public boolean isSplittable(){
- return true;
- }
- }
-
- public static class RowFileAppender extends FileAppender {
- private FSDataOutputStream out;
- private long lastSyncPos;
- private FileSystem fs;
- private byte[] sync;
- private ByteBuffer buffer;
-
- private BitArray nullFlags;
- // statistics
- private TableStatistics stats;
- private ShuffleType shuffleType;
-
- public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
- final Schema schema, final TableMeta meta, final Path workDir)
- throws IOException {
- super(conf, taskAttemptId, schema, meta, workDir);
- }
-
- public void init() throws IOException {
- SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
- ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
- fs = path.getFileSystem(conf);
-
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
-
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
-
- sync = new byte[SYNC_HASH_SIZE];
- lastSyncPos = 0;
-
- out = fs.create(path);
-
- MessageDigest md;
- try {
- md = MessageDigest.getInstance("MD5");
- md.update((path.toString()+System.currentTimeMillis()).getBytes());
- sync = md.digest();
- } catch (NoSuchAlgorithmException e) {
- LOG.error(e);
- }
-
- writeHeader();
-
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-
- nullFlags = new BitArray(schema.size());
-
- if (tableStatsEnabled) {
- this.stats = new TableStatistics(this.schema, columnStatsEnabled);
- this.shuffleType = PlannerUtil.getShuffleType(
- meta.getProperty(StorageConstants.SHUFFLE_TYPE,
- PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
- }
- }
-
- private void writeHeader() throws IOException {
- out.writeInt(SYNC_INTERVAL);
- out.write(sync);
- out.flush();
- lastSyncPos = out.getPos();
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
- checkAndWriteSync();
- Column col;
-
- buffer.clear();
- nullFlags.clear();
-
- for (int i = 0; i < schema.size(); i++) {
- if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
- // it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(i, t);
- }
-
- if (t.isBlankOrNull(i)) {
- nullFlags.set(i);
- } else {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- buffer.put(t.getByte(i));
- break;
- case BIT:
- buffer.put(t.getByte(i));
- break;
- case CHAR:
- byte[] src = t.getBytes(i);
- byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
- buffer.putInt(src.length);
- buffer.put(dst);
- break;
- case TEXT:
- byte [] strbytes = t.getBytes(i);
- buffer.putShort((short)strbytes.length);
- buffer.put(strbytes, 0, strbytes.length);
- break;
- case INT2:
- buffer.putShort(t.getInt2(i));
- break;
- case INT4:
- buffer.putInt(t.getInt4(i));
- break;
- case INT8:
- buffer.putLong(t.getInt8(i));
- break;
- case FLOAT4:
- buffer.putFloat(t.getFloat4(i));
- break;
- case FLOAT8:
- buffer.putDouble(t.getFloat8(i));
- break;
- case BLOB:
- byte [] bytes = t.getBytes(i);
- buffer.putShort((short)bytes.length);
- buffer.put(bytes);
- break;
- case NULL_TYPE:
- nullFlags.set(i);
- break;
- default:
- break;
- }
- }
- }
-
- byte[] bytes = nullFlags.toArray();
- out.writeShort(bytes.length);
- out.write(bytes);
-
- bytes = buffer.array();
- int dataLen = buffer.position();
- out.writeShort(dataLen);
- out.write(bytes, 0, dataLen);
-
- // Statistical section
- if (tableStatsEnabled) {
- stats.incrementRow();
- }
- }
-
- @Override
- public long getOffset() throws IOException {
- return out.getPos();
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- if (out != null) {
- if (tableStatsEnabled) {
- stats.setNumBytes(out.getPos());
- }
- sync();
- out.flush();
- IOUtils.cleanup(LOG, out);
- }
- }
-
- private void sync() throws IOException {
- if (lastSyncPos != out.getPos()) {
- out.writeInt(SYNC_ESCAPE);
- out.write(sync);
- lastSyncPos = out.getPos();
- }
- }
-
- private void checkAndWriteSync() throws IOException {
- if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
- sync();
- }
- }
-
- @Override
- public TableStats getStats() {
- if (tableStatsEnabled) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index 8f18c7a..7bdf0cb 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -19,161 +19,61 @@
package org.apache.tajo.storage.fragment;
import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto;
+import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
- @Expose private String tableName; // required
- @Expose private Path uri; // required
- @Expose public Long startOffset; // required
- @Expose public Long length; // required
-
- private String[] hosts; // Datanode hostnames
- @Expose private int[] diskIds;
-
- public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
- FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
- builder.mergeFrom(raw);
- builder.build();
- init(builder.build());
- }
+/**
+ * Fragment for file systems.
+ */
+public class FileFragment extends Fragment<Long> {
+ private Integer[] diskIds; // disk volume ids
public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
throws IOException {
- this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
+ this(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
}
- public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) {
- this.set(tableName, uri, start, length, hosts, diskIds);
+ public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, Integer[] diskIds) {
+ super(BuiltinFragmentKinds.FILE, uri.toUri(), tableName, start, start + length, length, hosts);
+ this.diskIds = diskIds;
}
+
// Non splittable
public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
- this.set(tableName, uri, start, length, hosts, null);
+ this(tableName, uri, start, length, hosts, null);
}
public FileFragment(String fragmentId, Path path, long start, long length) {
- this.set(fragmentId, path, start, length, null, null);
- }
-
- public FileFragment(FileFragmentProto proto) {
- init(proto);
- }
-
- private void init(FileFragmentProto proto) {
- int[] diskIds = new int[proto.getDiskIdsList().size()];
- int i = 0;
- for(Integer eachValue: proto.getDiskIdsList()) {
- diskIds[i++] = eachValue;
- }
- List<String> var = proto.getHostsList();
- this.set(proto.getId(), new Path(proto.getPath()),
- proto.getStartOffset(), proto.getLength(),
- var.toArray(new String[var.size()]),
- diskIds);
- }
-
- private void set(String tableName, Path path, long start,
- long length, String[] hosts, int[] diskIds) {
- this.tableName = tableName;
- this.uri = path;
- this.startOffset = start;
- this.length = length;
- this.hosts = hosts;
- this.diskIds = diskIds;
- }
-
-
- /**
- * Get the list of hosts (hostname) hosting this block
- */
- public String[] getHosts() {
- if (hosts == null) {
- this.hosts = new String[0];
- }
- return hosts;
+ this(fragmentId, path, start, length, null, null);
}
/**
* Get the list of Disk Ids
* Unknown disk is -1. Others 0 ~ N
*/
- public int[] getDiskIds() {
+ public Integer[] getDiskIds() {
if (diskIds == null) {
- this.diskIds = new int[getHosts().length];
- Arrays.fill(this.diskIds, -1);
+ this.diskIds = new Integer[getHostNames().size()];
+ Arrays.fill(this.diskIds, DataLocation.UNKNOWN_VOLUME_ID);
}
return diskIds;
}
- public void setDiskIds(int[] diskIds){
+ public void setDiskIds(Integer[] diskIds){
this.diskIds = diskIds;
}
- @Override
- public String getTableName() {
- return this.tableName;
- }
-
public Path getPath() {
- return this.uri;
+ return new Path(uri);
}
public void setPath(Path path) {
- this.uri = path;
- }
-
- public Long getStartKey() {
- return this.startOffset;
- }
-
- @Override
- public String getKey() {
- return this.uri.toString();
- }
-
- @Override
- public long getLength() {
- return this.length;
- }
-
- @Override
- public boolean isEmpty() {
- return this.length <= 0;
- }
- /**
- *
- * The offset range of tablets <b>MUST NOT</b> be overlapped.
- *
- * @param t
- * @return If the table paths are not same, return -1.
- */
- @Override
- public int compareTo(FileFragment t) {
- if (getPath().equals(t.getPath())) {
- long diff = this.getStartKey() - t.getStartKey();
- if (diff < 0) {
- return -1;
- } else if (diff > 0) {
- return 1;
- } else {
- return 0;
- }
- } else {
- return -1;
- }
+ this.uri = path.toUri();
}
@Override
@@ -191,48 +91,20 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
@Override
public int hashCode() {
- return Objects.hashCode(tableName, uri, startOffset, length);
+ return Objects.hashCode(inputSourceId, uri, startKey, endKey, length, diskIds, hostNames);
}
-
+
+ @Override
public Object clone() throws CloneNotSupportedException {
FileFragment frag = (FileFragment) super.clone();
- frag.tableName = tableName;
- frag.uri = uri;
frag.diskIds = diskIds;
- frag.hosts = hosts;
-
return frag;
}
@Override
public String toString() {
- return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+ return "\"fragment\": {\"id\": \""+ inputSourceId +"\", \"path\": "
+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
+ getLength() + "}" ;
}
-
- public FragmentProto getProto() {
- FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
- builder.setId(this.tableName);
- builder.setStartOffset(this.startOffset);
- builder.setLength(this.length);
- builder.setPath(this.uri.toString());
- if(diskIds != null) {
- List<Integer> idList = new ArrayList<>();
- for(int eachId: diskIds) {
- idList.add(eachId);
- }
- builder.addAllDiskIds(idList);
- }
-
- if(hosts != null) {
- builder.addAllHosts(Arrays.asList(hosts));
- }
-
- FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
- fragmentBuilder.setId(this.tableName);
- fragmentBuilder.setDataFormat(BuiltinStorages.TEXT);
- fragmentBuilder.setContents(builder.buildPartial().toByteString());
- return fragmentBuilder.build();
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java
new file mode 100644
index 0000000..926e5fe
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileFragmentSerde implements FragmentSerde<FileFragment, FileFragmentProto> {
+
+ @Override
+ public Builder newBuilder() {
+ return FileFragmentProto.newBuilder();
+ }
+
+ @Override
+ public FileFragmentProto serialize(FileFragment fragment) {
+ FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+ builder.setId(fragment.inputSourceId);
+ builder.setStartOffset(fragment.startKey);
+ builder.setLength(fragment.length);
+ builder.setPath(fragment.getPath().toString());
+ if(fragment.getDiskIds() != null) {
+ List<Integer> idList = new ArrayList<>();
+ for(int eachId: fragment.getDiskIds()) {
+ idList.add(eachId);
+ }
+ builder.addAllDiskIds(idList);
+ }
+
+ if(fragment.hostNames != null) {
+ builder.addAllHosts(fragment.hostNames);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public FileFragment deserialize(FileFragmentProto proto) {
+ return new FileFragment(
+ proto.getId(),
+ new Path(proto.getPath()),
+ proto.getStartOffset(),
+ proto.getLength(),
+ proto.getHostsList().toArray(new String[proto.getHostsCount()]),
+ proto.getDiskIdsList().toArray(new Integer[proto.getDiskIdsCount()]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 964a58a..a2688b1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -88,7 +88,7 @@ public class DelimitedLineReader implements Closeable {
}
pos = startOffset = fragment.getStartKey();
- end = startOffset + fragment.getLength();
+ end = fragment.getEndKey();
if (codec != null) {
fis = fs.open(fragment.getPath());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index e112b0d..d52f46d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -291,7 +291,7 @@ public class DelimitedTextFile {
}
startOffset = this.fragment.getStartKey();
- endOffset = startOffset + fragment.getLength();
+ endOffset = this.fragment.getEndKey();
errorTorrenceMaxNum =
Integer.parseInt(meta.getProperty(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
index 58fea2b..a29e86b 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
@@ -99,7 +99,7 @@ public class OrcRecordReader implements Closeable {
long rows = 0;
long skippedRows = 0;
long offset = fragment.getStartKey();
- long maxOffset = fragment.getStartKey() + fragment.getLength();
+ long maxOffset = fragment.getEndKey();
for(StripeInformation stripe: stripes) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index ad45d18..42b0b23 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -159,14 +159,14 @@ public class TestFileTablespace {
splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()])));
assertEquals(testCount, splits.size());
// -1 is unknown volumeId
- assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ assertEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue());
splits.clear();
splits.addAll(space.getSplits("data", meta, schema, false,
partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
assertEquals(testCount / 2, splits.size());
- assertEquals(1, splits.get(0).getHosts().length);
- assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ assertEquals(1, splits.get(0).getHostNames().size());
+ assertEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue());
fs.close();
} finally {
@@ -259,9 +259,9 @@ public class TestFileTablespace {
splits.addAll(sm.getSplits("data", meta, schema, false, tablePath));
assertEquals(testCount, splits.size());
- assertEquals(2, splits.get(0).getHosts().length);
+ assertEquals(2, splits.get(0).getHostNames().size());
assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
- assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ assertNotEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue());
fs.close();
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 0e6fde5..7a9b074 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage.index;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.catalog.*;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
@@ -74,8 +75,8 @@ public class TestBSTIndex {
@Parameters(name = "{index}: {0}")
public static Collection<Object[]> generateParameters() {
return Arrays.asList(new Object[][]{
- {"RAW"},
- {"TEXT"}
+ {BuiltinStorages.RAW},
+ {BuiltinStorages.TEXT}
});
}
@@ -128,7 +129,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -152,7 +153,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -233,7 +234,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -298,7 +299,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -372,7 +373,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -396,7 +397,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple result;
@@ -466,7 +467,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -490,7 +491,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple result;
@@ -549,7 +550,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -575,7 +576,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
tuple.put(0, DatumFactory.createInt8(0));
@@ -636,7 +637,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -744,7 +745,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -828,7 +829,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -854,7 +855,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
for (int i = (TUPLE_NUM - 1); i > 0; i--) {
@@ -921,7 +922,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -950,7 +951,7 @@ public class TestBSTIndex {
assertEquals(comp, reader.getComparator());
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple result;
@@ -1023,7 +1024,7 @@ public class TestBSTIndex {
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
@@ -1047,7 +1048,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
- getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 4f6b566..cca7920 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -114,7 +114,7 @@ public class TestSingleCSVFileBSTIndex {
creater.init();
SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
- .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ .getSeekableScanner(meta, schema, tablet, schema);
fileScanner.init();
Tuple keyTuple;
long offset;
@@ -139,7 +139,7 @@ public class TestSingleCSVFileBSTIndex {
"FindValueInCSV.idx"), keySchema, comp);
reader.init();
fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
- .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ .getSeekableScanner(meta, schema, tablet, schema);
fileScanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
@@ -206,7 +206,7 @@ public class TestSingleCSVFileBSTIndex {
creater.init();
SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
- .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ .getSeekableScanner(meta, schema, tablet, schema);
fileScanner.init();
Tuple keyTuple;
long offset;
@@ -228,7 +228,7 @@ public class TestSingleCSVFileBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
reader.init();
fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
- .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+ .getSeekableScanner(meta, schema, tablet, schema);
fileScanner.init();
Tuple result;
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 3283f9f..7ae58aa 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -39,44 +39,28 @@
<!--- Fragment Class Configurations -->
<property>
- <name>tajo.storage.fragment.text.class</name>
+ <name>tajo.storage.fragment.kind.file</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.json.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.draw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.file</name>
+ <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.orc.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
</property>
<!--- Scanner Handler -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
index 0088504..20c1aca 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
@@ -18,90 +18,34 @@
package org.apache.tajo.storage.jdbc;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto;
-import java.util.Arrays;
+import java.net.URI;
+import java.util.List;
-public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneable {
- String uri;
- String inputSourceId;
- String [] hostNames;
-
-
- public JdbcFragment(ByteString raw) throws InvalidProtocolBufferException {
- JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
- builder.mergeFrom(raw);
- builder.build();
- init(builder.build());
- }
+/**
+ * Fragment for the systems which connects to Tajo via the JDBC interface.
+ */
+public class JdbcFragment extends Fragment<Long> {
- public JdbcFragment(String inputSourceId, String uri) {
- this.inputSourceId = inputSourceId;
- this.uri = uri;
- this.hostNames = extractHosts(uri);
+ // TODO: set start and end keys properly
+ public JdbcFragment(String inputSourceId, URI uri) {
+ super(BuiltinFragmentKinds.JDBC, uri, inputSourceId, null, null, TajoConstants.UNKNOWN_LENGTH, extractHosts(uri));
}
- private void init(JdbcFragmentProto proto) {
- this.uri = proto.getUri();
- this.inputSourceId = proto.getInputSourceId();
- this.hostNames = proto.getHostsList().toArray(new String [proto.getHostsCount()]);
+ public JdbcFragment(String inputSourceId, URI uri, List<String> hostNames) {
+ super(BuiltinFragmentKinds.JDBC, uri, inputSourceId, null, null, TajoConstants.UNKNOWN_LENGTH,
+ hostNames.toArray(new String[hostNames.size()]));
}
- private String [] extractHosts(String uri) {
+ private static String[] extractHosts(URI uri) {
return new String[] {ConnectionInfo.fromURI(uri).host};
}
@Override
- public String getTableName() {
- return inputSourceId;
- }
-
- public String getUri() {
- return uri;
- }
-
- @Override
- public long getLength() {
- return 0;
- }
-
- @Override
- public String getKey() {
- return null;
- }
-
- @Override
- public String[] getHosts() {
- return hostNames;
- }
-
- @Override
public boolean isEmpty() {
return false;
}
-
- @Override
- public CatalogProtos.FragmentProto getProto() {
- JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
- builder.setInputSourceId(this.inputSourceId);
- builder.setUri(this.uri);
- if(hostNames != null) {
- builder.addAllHosts(Arrays.asList(hostNames));
- }
-
- CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder();
- fragmentBuilder.setId(this.inputSourceId);
- fragmentBuilder.setDataFormat("JDBC");
- fragmentBuilder.setContents(builder.buildPartial().toByteString());
- return fragmentBuilder.build();
- }
-
- @Override
- public int compareTo(JdbcFragment o) {
- return this.uri.compareTo(o.uri);
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java
new file mode 100644
index 0000000..9e26d80
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.tajo.storage.fragment.FragmentSerde;
+import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto;
+
+import java.net.URI;
+
+public class JdbcFragmentSerde implements FragmentSerde<JdbcFragment, JdbcFragmentProto> {
+
+ @Override
+ public Builder newBuilder() {
+ return JdbcFragmentProto.newBuilder();
+ }
+
+ @Override
+ public JdbcFragmentProto serialize(JdbcFragment fragment) {
+ return JdbcFragmentProto.newBuilder()
+ .setInputSourceId(fragment.getInputSourceId())
+ .setUri(fragment.getUri().toASCIIString())
+ .addAllHosts(fragment.getHostNames())
+ .build();
+ }
+
+ @Override
+ public JdbcFragment deserialize(JdbcFragmentProto proto) {
+ return new JdbcFragment(proto.getInputSourceId(), URI.create(proto.getUri()), proto.getHostsList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
index a58b861..76ed3ab 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
@@ -258,7 +258,7 @@ public abstract class JdbcScanner implements Scanner {
private ResultSetIterator executeQueryAndGetIter() {
try {
LOG.info("Generated SQL: " + generatedSql);
- Connection conn = DriverManager.getConnection(fragment.uri, connProperties);
+ Connection conn = DriverManager.getConnection(fragment.getUri().toASCIIString(), connProperties);
Statement statement = conn.createStatement();
ResultSet resultset = statement.executeQuery(generatedSql);
return new ResultSetIterator((resultset));
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index 7a630b2..c353831 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -124,7 +124,7 @@ public abstract class JdbcTablespace extends Tablespace {
TableDesc tableDesc,
boolean requireSorted,
@Nullable EvalNode filterCondition) throws IOException {
- return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString()));
+ return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri()));
}
@Override
[2/2] tajo git commit: TAJO-2146: Fragment interface cleanup.
Posted by ji...@apache.org.
TAJO-2146: Fragment interface cleanup.
Closes #1015
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1c44272b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1c44272b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1c44272b
Branch: refs/heads/master
Commit: 1c44272bff0fc0022a1c8ce060b70d11a30c59e0
Parents: 3de3774
Author: Jihoon Son <ji...@apache.org>
Authored: Wed May 18 10:54:02 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed May 18 10:54:02 2016 +0900
----------------------------------------------------------------------
CHANGES | 4 +-
.../src/main/proto/CatalogProtos.proto | 2 +-
.../org/apache/tajo/datum/TestInt4Datum.java | 6 +-
.../tajo/engine/query/TestHBaseTable.java | 44 +-
.../apache/tajo/storage/TestFileFragment.java | 13 +-
.../org/apache/tajo/storage/TestRowFile.java | 140 ------
.../engine/planner/PhysicalPlannerImpl.java | 4 +-
.../planner/physical/BSTIndexScanExec.java | 2 +-
.../planner/physical/ExternalSortExec.java | 4 +-
.../planner/physical/IndexExecutorUtil.java | 5 +-
.../planner/physical/PhysicalPlanUtil.java | 3 +-
.../engine/planner/physical/SeqScanExec.java | 2 +-
.../engine/planner/physical/StoreIndexExec.java | 2 +-
.../exec/NonForwardQueryResultFileScanner.java | 4 +-
.../java/org/apache/tajo/querymaster/Task.java | 19 +-
.../apache/tajo/worker/TaskAttemptContext.java | 15 +-
.../org/apache/tajo/storage/Tablespace.java | 15 +
.../storage/fragment/BuiltinFragmentKinds.java | 25 +
.../apache/tajo/storage/fragment/Fragment.java | 148 +++++-
.../storage/fragment/FragmentConvertor.java | 99 ++--
.../tajo/storage/fragment/FragmentSerde.java | 54 ++
.../src/main/resources/storage-default.xml | 46 +-
.../src/test/resources/storage-default.xml | 46 +-
.../tajo/storage/hbase/HBaseFragment.java | 187 +++----
.../tajo/storage/hbase/HBaseFragmentSerde.java | 60 +++
.../apache/tajo/storage/hbase/HBaseScanner.java | 20 +-
.../tajo/storage/hbase/HBaseTablespace.java | 4 +-
.../org/apache/tajo/storage/FileTablespace.java | 4 +-
.../java/org/apache/tajo/storage/RawFile.java | 2 +-
.../java/org/apache/tajo/storage/RowFile.java | 502 -------------------
.../tajo/storage/fragment/FileFragment.java | 174 +------
.../storage/fragment/FileFragmentSerde.java | 66 +++
.../tajo/storage/text/DelimitedLineReader.java | 2 +-
.../tajo/storage/text/DelimitedTextFile.java | 2 +-
.../storage/thirdparty/orc/OrcRecordReader.java | 2 +-
.../apache/tajo/storage/TestFileTablespace.java | 10 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 41 +-
.../index/TestSingleCSVFileBSTIndex.java | 8 +-
.../src/test/resources/storage-default.xml | 38 +-
.../apache/tajo/storage/jdbc/JdbcFragment.java | 86 +---
.../tajo/storage/jdbc/JdbcFragmentSerde.java | 47 ++
.../apache/tajo/storage/jdbc/JdbcScanner.java | 2 +-
.../tajo/storage/jdbc/JdbcTablespace.java | 2 +-
43 files changed, 705 insertions(+), 1256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8f6c0fe..beee5be 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,7 +4,7 @@ Release 0.12.0 - unreleased
NEW FEATURES
- TAJO-1686: Allow Tajo to use Hive UDF. (jihoon)
+ TAJO-1686: Allow Tajo to use Hive UDF. (Jongyoung Park via jihoon)
TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
@@ -14,6 +14,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-2146: Fragment interface cleanup. (jihoon)
+
TAJO-2129: Apply new type implementation to Schema and Catalog. (hyunsik)
TAJO-2071: Supporting DATE type in Parquet format.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index e79bc75..b42cf58 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -75,7 +75,7 @@ message SchemaProto {
message FragmentProto {
required string id = 1;
- required string data_format = 2;
+ required string kind = 2;
required bytes contents = 3;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
index 559bed3..294f8bb 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
@@ -18,12 +18,10 @@
package org.apache.tajo.datum;
-import org.junit.Test;
import org.apache.tajo.common.TajoDataTypes;
+import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestInt4Datum {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 8914e3b..5819179 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -555,8 +555,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
Tablespace tablespace = TablespaceManager.getByName("cluster1");
List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(1, fragments.size());
- assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
- assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
+ assertEquals("021", ((HBaseFragment)fragments.get(0)).getStartKey().toString());
+ assertEquals("021" + postFix, ((HBaseFragment)fragments.get(0)).getEndKey().toString());
// where rk >= '020' and rk <= '055'
EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -569,12 +569,12 @@ public class TestHBaseTable extends QueryTestCaseBase {
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(2, fragments.size());
HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("055" + postFix, fragment2.getEndKey().toString());
// where (rk >= '020' and rk <= '055') or rk = '075'
EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -584,16 +584,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("055" + postFix, fragment2.getEndKey().toString());
HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
- assertEquals("075", new String(fragment3.getStartRow()));
- assertEquals("075" + postFix, new String(fragment3.getStopRow()));
+ assertEquals("075", fragment3.getStartKey().toString());
+ assertEquals("075" + postFix, fragment3.getEndKey().toString());
// where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
@@ -608,16 +608,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("055" + postFix, fragment2.getEndKey().toString());
fragment3 = (HBaseFragment) fragments.get(2);
- assertEquals("072", new String(fragment3.getStartRow()));
- assertEquals("078" + postFix, new String(fragment3.getStopRow()));
+ assertEquals("072", fragment3.getStartKey().toString());
+ assertEquals("078" + postFix, fragment3.getEndKey().toString());
// where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -631,12 +631,12 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertEquals(2, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("059" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("059" + postFix, fragment2.getEndKey().toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
index 86f7d1a..cccff70 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -20,6 +20,8 @@ package org.apache.tajo.storage;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.CommonTestingUtil;
@@ -34,10 +36,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestFileFragment {
+ private TajoConf conf;
private Path path;
@Before
public final void setUp() throws Exception {
+ conf = new TajoConf();
path = CommonTestingUtil.getTestDir();
}
@@ -45,7 +49,7 @@ public class TestFileFragment {
public final void testGetAndSetFields() {
FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
- assertEquals("table1_1", fragment1.getTableName());
+ assertEquals("table1_1", fragment1.getInputSourceId());
assertEquals(new Path(path, "table0"), fragment1.getPath());
assertTrue(0 == fragment1.getStartKey());
assertTrue(500 == fragment1.getLength());
@@ -55,8 +59,9 @@ public class TestFileFragment {
public final void testGetProtoAndRestore() {
FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
- FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
- assertEquals("table1_1", fragment1.getTableName());
+ FileFragment fragment1 = FragmentConvertor.convert(conf, BuiltinFragmentKinds.FILE,
+ FragmentConvertor.toFragmentProto(conf, fragment));
+ assertEquals("table1_1", fragment1.getInputSourceId());
assertEquals(new Path(path, "table0"), fragment1.getPath());
assertTrue(0 == fragment1.getStartKey());
assertTrue(500 == fragment1.getLength());
@@ -73,7 +78,7 @@ public class TestFileFragment {
Arrays.sort(tablets);
for(int i = 0; i < num; i++) {
- assertEquals("tablet1_"+i, tablets[i].getTableName());
+ assertEquals("tablet1_"+i, tablets[i].getInputSourceId());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
deleted file mode 100644
index 0173f59..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaBuilder;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestRowFile {
- private static final Log LOG = LogFactory.getLog(TestRowFile.class);
-
- private TajoTestingCluster cluster;
- private TajoConf conf;
-
- @Before
- public void setup() throws Exception {
- cluster = TpchTestBase.getInstance().getTestingCluster();
- conf = cluster.getConfiguration();
- }
-
- @After
- public void teardown() throws Exception {
- }
-
- @Test
- public void test() throws IOException {
- Schema schema = SchemaBuilder.builder()
- .add("id", Type.INT4)
- .add("age", Type.INT8)
- .add("description", Type.TEXT)
- .build();
-
- TableMeta meta = CatalogUtil.newTableMeta("ROWFILE", conf);
-
- FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri());
-
- Path tablePath = new Path("/test");
- Path dataPath = new Path(tablePath, "test.tbl");
- FileSystem fs = sm.getFileSystem();
- fs.mkdirs(tablePath);
-
- Appender appender = sm.getAppender(meta, schema, dataPath);
- appender.enableStats();
- appender.init();
-
- int tupleNum = 200;
- Tuple tuple;
- Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz");
- Set<Integer> idSet = Sets.newHashSet();
-
- tuple = new VTuple(3);
- long start = System.currentTimeMillis();
- for(int i = 0; i < tupleNum; i++) {
- tuple.put(0, DatumFactory.createInt4(i + 1));
- tuple.put(1, DatumFactory.createInt8(25l));
- tuple.put(2, stringDatum);
- appender.addTuple(tuple);
- idSet.add(i+1);
- }
- appender.close();
-
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
-
- FileStatus file = fs.getFileStatus(dataPath);
- FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen());
-
- int tupleCnt = 0;
- start = System.currentTimeMillis();
- Scanner scanner = sm.getScanner(meta, schema, fragment, null);
- scanner.init();
- while ((tuple=scanner.next()) != null) {
- tupleCnt++;
- }
- scanner.close();
-
- assertEquals(tupleNum, tupleCnt);
-
- tupleCnt = 0;
- long fileStart = 0;
- long fileLen = file.getLen()/13;
-
- for (int i = 0; i < 13; i++) {
- fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen);
- scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment);
- scanner.init();
- while ((tuple=scanner.next()) != null) {
- if (!idSet.remove(tuple.getInt4(0)) && LOG.isDebugEnabled()) {
- LOG.debug("duplicated! " + tuple.getInt4(0));
- }
- tupleCnt++;
- }
- scanner.close();
- fileStart += fileLen;
- if (i == 11) {
- fileLen = file.getLen() - fileStart;
- }
- }
- assertEquals(tupleNum, tupleCnt);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 87a6e74..fce56fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -923,13 +923,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
List<Fragment> fileFragments = new ArrayList<>();
- FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri());
+ FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri());
for (Path path : partitionedTableScanNode.getInputPaths()) {
fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path)));
}
FragmentProto[] fragments =
- FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()]));
+ FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()]));
ctx.addFragments(scanNode.getCanonicalName(), fragments);
return new PartitionMergeScanExec(ctx, scanNode, fragments);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 3be1d36..7ab0943 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -86,7 +86,7 @@ public class BSTIndexScanExec extends ScanExec {
this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
- Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
+ Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(context.getConf(), fragment));
this.reader = new BSTIndex(context.getConf()).
getIndexReader(indexPath, keySchema, comparator);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 3d2de28..4ec5144 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -170,7 +170,7 @@ public class ExternalSortExec extends SortExec {
mergedInputFragments = new ArrayList<>();
for (CatalogProtos.FragmentProto proto : fragments) {
- FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ FileFragment fragment = FragmentConvertor.convert(context.getConf(), proto);
mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta()));
}
}
@@ -464,7 +464,7 @@ public class ExternalSortExec extends SortExec {
debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem());
}
chunk.getMemoryTuples().release();
- } else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
+ } else if (chunk.getFragment().getInputSourceId().contains(INTERMEDIATE_FILE_PREFIX)) {
localFS.delete(chunk.getFragment().getPath(), true);
numDeletedFiles++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
index 3b8317f..df143c8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
@@ -18,14 +18,15 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
public class IndexExecutorUtil {
- public static String getIndexFileName(FragmentProto fragmentProto) {
- FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto);
+ public static String getIndexFileName(Configuration conf, FragmentProto fragmentProto) {
+ FileFragment fileFragment = FragmentConvertor.convert(conf, fragmentProto);
StringBuilder sb = new StringBuilder();
sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
return sb.toString();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index d1dfe40..074d0ab 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -40,6 +40,7 @@ import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import java.io.IOException;
@@ -120,7 +121,7 @@ public class PhysicalPlanUtil {
fragments.add(fileFragment);
}
}
- return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[fragments.size()]));
+ return FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index dc48f3f..52cb080 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -97,7 +97,7 @@ public class SeqScanExec extends ScanExec {
Tuple partitionRow = null;
if (fragments != null && fragments.length > 0) {
- List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+ List<FileFragment> fileFragments = FragmentConvertor.convert(context.getConf(), fragments);
// Get a partition key value from a given path
partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath(
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index c5e1093..46f672d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -72,7 +72,7 @@ public class StoreIndexExec extends UnaryPhysicalExec {
TajoConf conf = context.getConf();
Path indexPath = new Path(logicalPlan.getIndexPath().toString(),
- IndexExecutorUtil.getIndexFileName(scanExec.getFragments()[0]));
+ IndexExecutorUtil.getIndexFileName(conf, scanExec.getFragments()[0]));
// TODO: Create factory using reflection
BSTIndex bst = new BSTIndex(conf);
this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 871db89..e8b8b45 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -108,7 +108,9 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));
if (!fragments.isEmpty()) {
- FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()]));
+ FragmentProto[] fragmentProtos =
+ FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
+
this.taskContext = new TaskAttemptContext(
new QueryContext(tajoConf), null,
new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index f8b89f1..f1ad931 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -20,6 +20,7 @@ package org.apache.tajo.querymaster;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -281,7 +282,7 @@ public class Task implements EventHandler<TaskEvent> {
fragmentList.add(fragment.toString());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- fragmentList.add("ERROR: " + eachFragment.getDataFormat() + "," + eachFragment.getId() + ": " + e.getMessage());
+ fragmentList.add("ERROR: " + eachFragment.getKind() + "," + eachFragment.getId() + ": " + e.getMessage());
}
}
taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
@@ -330,25 +331,25 @@ public class Task implements EventHandler<TaskEvent> {
}
private void addDataLocation(Fragment fragment) {
- String[] hosts = fragment.getHosts();
- int[] diskIds = null;
+ ImmutableList<String> hosts = fragment.getHostNames();
+ Integer[] diskIds = null;
if (fragment instanceof FileFragment) {
diskIds = ((FileFragment)fragment).getDiskIds();
}
- for (int i = 0; i < hosts.length; i++) {
- dataLocations.add(new DataLocation(hosts[i], diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
+ for (int i = 0; i < hosts.size(); i++) {
+ dataLocations.add(new DataLocation(hosts.get(i), diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
}
}
public void addFragment(Fragment fragment, boolean useDataLocation) {
Set<FragmentProto> fragmentProtos;
- if (fragMap.containsKey(fragment.getTableName())) {
- fragmentProtos = fragMap.get(fragment.getTableName());
+ if (fragMap.containsKey(fragment.getInputSourceId())) {
+ fragmentProtos = fragMap.get(fragment.getInputSourceId());
} else {
fragmentProtos = new HashSet<>();
- fragMap.put(fragment.getTableName(), fragmentProtos);
+ fragMap.put(fragment.getInputSourceId(), fragmentProtos);
}
- fragmentProtos.add(fragment.getProto());
+ fragmentProtos.add(FragmentConvertor.toFragmentProto(systemConf, fragment));
if (useDataLocation) {
addDataLocation(fragment);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 47f1af2..1cba931 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -128,7 +128,8 @@ public class TaskAttemptContext {
@VisibleForTesting
public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId,
final Fragment [] fragments, final Path workDir) {
- this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+ this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(queryContext.getConf(), fragments),
+ workDir);
}
public TajoConf getConf() {
@@ -258,12 +259,12 @@ public class TaskAttemptContext {
public void updateAssignedFragments(String tableId, Fragment[] fragments) {
fragmentMap.remove(tableId);
for(Fragment t : fragments) {
- if (fragmentMap.containsKey(t.getTableName())) {
- fragmentMap.get(t.getTableName()).add(t.getProto());
+ if (fragmentMap.containsKey(t.getInputSourceId())) {
+ fragmentMap.get(t.getInputSourceId()).add(FragmentConvertor.toFragmentProto(getConf(), t));
} else {
List<FragmentProto> frags = new ArrayList<>();
- frags.add(t.getProto());
- fragmentMap.put(t.getTableName(), frags);
+ frags.add(FragmentConvertor.toFragmentProto(getConf(), t));
+ fragmentMap.put(t.getInputSourceId(), frags);
}
}
}
@@ -281,7 +282,7 @@ public class TaskAttemptContext {
List<Path> paths = fragmentToPath(tableFragments);
for (FragmentProto eachFragment: fragments) {
- FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+ FileFragment fileFragment = FragmentConvertor.convert(getConf(), eachFragment);
// If current attempt already has same path, we don't need to add it to fragments.
if (!paths.contains(fileFragment.getPath())) {
tableFragments.add(eachFragment);
@@ -297,7 +298,7 @@ public class TaskAttemptContext {
List<Path> list = new ArrayList<>();
for (FragmentProto proto : tableFragments) {
- FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ FileFragment fragment = FragmentConvertor.convert(getConf(), proto);
list.add(fragment.getPath());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 4afb383..4fcd5dc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -244,6 +244,21 @@ public abstract class Tablespace {
}
/**
+ * Returns Scanner instance.
+ *
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target The output schema
+ * @return Scanner instance
+ * @throws IOException
+ */
+ public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, Fragment fragment,
+ Schema target) throws IOException {
+ return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
+ }
+
+ /**
* Returns Appender instance.
* @param queryContext Query property.
* @param taskAttemptId Task id.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
new file mode 100644
index 0000000..9c4fce5
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+public class BuiltinFragmentKinds {
+ public static final String FILE = "FILE";
+ public static final String HBASE = "HBASE";
+ public static final String JDBC = "JDBC";
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
index ac43197..a8de4ab 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -18,22 +18,150 @@
package org.apache.tajo.storage.fragment;
-import org.apache.tajo.common.ProtoObject;
+import com.google.common.collect.ImmutableList;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import java.net.URI;
-public interface Fragment extends ProtoObject<FragmentProto> {
+/**
+ * The Fragment is similar to the split in MapReduce.
+ * For distributed processing of a a single large table,
+ * it contains the information of which part of data will be processed by each task.
+ *
+ * @param <T> type of fragment key. It should implement the Comparable interface.
+ */
+public abstract class Fragment<T extends Comparable> implements Comparable<Fragment<T>>, Cloneable {
- public abstract String getTableName();
+ protected String kind;
+ protected URI uri;
+ protected String inputSourceId;
+ protected T startKey;
+ protected T endKey;
+ protected long length;
+ protected ImmutableList<String> hostNames;
- @Override
- public abstract FragmentProto getProto();
+ protected Fragment(String kind,
+ URI uri,
+ String inputSourceId,
+ T startKey,
+ T endKey,
+ long length,
+ String[] hostNames) {
+ this.kind = kind;
+ this.uri = uri;
+ this.inputSourceId = inputSourceId;
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.length = length;
+ this.hostNames = hostNames == null ? ImmutableList.of() : ImmutableList.copyOf(hostNames);
+ }
+
+ /**
+ * Returns the fragment type.
+ *
+ * @return fragment type
+ */
+ public final String getKind() {
+ return kind;
+ }
+
+ /**
+ * Returns an unique URI of the input source.
+ *
+ * @return URI of the input source
+ */
+ public final URI getUri() {
+ return uri;
+ }
- public abstract long getLength();
+ /**
+ * Returns a unique id of the input source.
+ *
+ * @return id of the input source
+ */
+ public final String getInputSourceId() {
+ return this.inputSourceId;
+ }
- public abstract String getKey();
+ /**
+ * Returns the start key of the data range.
+ * {@link org.apache.tajo.storage.Scanner} will start reading data from the point indicated by this key.
+ *
+ * @return start key
+ */
+ public final T getStartKey() {
+ return startKey;
+ }
- public String[] getHosts();
+ /**
+ * Returns the end key of the data range.
+ * {@link org.apache.tajo.storage.Scanner} will stop reading data when it reaches the point indicated by this key.
+ *
+ * @return end key
+ */
+ public final T getEndKey() {
+ return endKey;
+ }
- public abstract boolean isEmpty();
+ /**
+ * Returns the length of the data range between start key and end key.
+ *
+ * @return length of the range
+ */
+ public final long getLength() {
+ return length;
+ }
+
+ /**
+ * Returns host names which have any portion of the data between start key and end key.
+ *
+ * @return host names
+ */
+ public final ImmutableList<String> getHostNames() {
+ return hostNames;
+ }
+
+ /**
+ * Indicates the fragment is empty or not.
+ * An empty fragment means that there is no data to read.
+ *
+ * @return true if the fragment is empty. Otherwise, false.
+ */
+ public boolean isEmpty() {
+ return length == 0;
+ }
+
+ /**
+ * First compares URIs of fragments, and then compares their start keys.
+ *
+ * @param t
+ * @return return 0 if two fragments are same. If not same, return -1 if this fragment is smaller than the other.
+ * Otherwise, return 1;
+ */
+ @Override
+ public final int compareTo(Fragment<T> t) {
+ int cmp = uri.compareTo(t.uri);
+ if (cmp == 0) {
+ if (startKey != null && t.startKey != null) {
+ return startKey.compareTo(t.startKey);
+ } else if (startKey == null) { // nulls last
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ return cmp;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ Fragment clone = (Fragment) super.clone();
+ clone.uri = this.uri;
+ clone.inputSourceId = this.inputSourceId;
+ clone.startKey = this.startKey;
+ clone.endKey = this.endKey;
+ clone.hostNames = this.hostNames;
+ clone.length = this.length;
+ return clone;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
index 4ce6928..835a714 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -20,13 +20,12 @@ package org.apache.tajo.storage.fragment;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.exception.TajoInternalError;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
@@ -34,95 +33,81 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@ThreadSafe
public class FragmentConvertor {
- /**
- * Cache of fragment classes
- */
- protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
- /**
- * Cache of constructors for each class.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
- /**
- * default parameter for all constructors
- */
- private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
- public static Class<? extends Fragment> getFragmentClass(Configuration conf, String dataFormat) {
- Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(dataFormat.toLowerCase());
- if (fragmentClass == null) {
- fragmentClass = conf.getClass(
- String.format("tajo.storage.fragment.%s.class", dataFormat.toLowerCase()), null, Fragment.class);
- CACHED_FRAGMENT_CLASSES.put(dataFormat.toLowerCase(), fragmentClass);
+ private static final Map<String, FragmentSerde> SERDE_MAP = Maps.newConcurrentMap();
+
+ private static FragmentSerde getFragmentSerde(Configuration conf, String fragmentKind) {
+ fragmentKind = fragmentKind.toLowerCase();
+ FragmentSerde serde = SERDE_MAP.get(fragmentKind);
+ if (serde == null) {
+ Class<? extends FragmentSerde> serdeClass = conf.getClass(
+ String.format("tajo.storage.fragment.serde.%s", fragmentKind), null, FragmentSerde.class);
+ try {
+ serde = serdeClass.getConstructor(null).newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new TajoInternalError(e);
+ }
+ SERDE_MAP.put(fragmentKind, serde);
}
- if (fragmentClass == null) {
- throw new TajoInternalError("No such a fragment for " + dataFormat.toLowerCase());
+ if (serde == null) {
+ throw new TajoInternalError("No such a serde for " + fragmentKind);
}
- return fragmentClass;
+ return serde;
}
- public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
- T result;
+ public static <T extends Fragment> T convert(Configuration conf, String fragmentKind, FragmentProto fragment) {
+ FragmentSerde serde = getFragmentSerde(conf, fragmentKind);
try {
- Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
- if (constructor == null) {
- constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
- constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(clazz, constructor);
- }
- result = constructor.newInstance(new Object[]{fragment.getContents()});
- } catch (Throwable e) {
+ return (T) serde.deserialize(
+ serde.newBuilder()
+ .mergeFrom(fragment.getContents())
+ .build());
+ } catch (InvalidProtocolBufferException e) {
throw new TajoInternalError(e);
}
-
- return result;
}
public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) {
- Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getDataFormat().toLowerCase());
- if (fragmentClass == null) {
- throw new TajoInternalError("No such a fragment class for " + fragment.getDataFormat());
- }
- return convert(fragmentClass, fragment);
+ return convert(conf, fragment.getKind(), fragment);
}
- public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
- throws IOException {
+ public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) {
List<T> list = Lists.newArrayList();
if (fragments == null) {
return list;
}
for (FragmentProto proto : fragments) {
- list.add(convert(clazz, proto));
+ list.add(convert(conf, proto));
}
return list;
}
- public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) {
- List<T> list = Lists.newArrayList();
- if (fragments == null) {
- return list;
- }
- for (FragmentProto proto : fragments) {
- list.add((T) convert(conf, proto));
- }
- return list;
+ public static FragmentProto toFragmentProto(Configuration conf, Fragment fragment) {
+ FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+ fragmentBuilder.setId(fragment.getInputSourceId());
+ fragmentBuilder.setKind(fragment.getKind());
+ fragmentBuilder.setContents(getFragmentSerde(conf, fragment.getKind()).serialize(fragment).toByteString());
+ return fragmentBuilder.build();
}
- public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
+ public static List<FragmentProto> toFragmentProtoList(Configuration conf, Fragment... fragments) {
List<FragmentProto> list = Lists.newArrayList();
if (fragments == null) {
return list;
}
for (Fragment fragment : fragments) {
- list.add(fragment.getProto());
+ list.add(toFragmentProto(conf, fragment));
}
return list;
}
- public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
- List<FragmentProto> list = toFragmentProtoList(fragments);
+ public static FragmentProto [] toFragmentProtoArray(Configuration conf, Fragment... fragments) {
+ List<FragmentProto> list = toFragmentProtoList(conf, fragments);
return list.toArray(new FragmentProto[list.size()]);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
new file mode 100644
index 0000000..c570c9c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.GeneratedMessage.Builder;
+
+/**
+ * FragmentSerde abstracts how a fragment is serialized / deserialized to / from a Protocol Buffer message.
+ *
+ * @param <F> Fragment class
+ * @param <P> Protocol Buffer Message class corresponding to the Fragment class
+ */
+public interface FragmentSerde<F extends Fragment, P extends Message> {
+
+ /**
+ * Creates a new builder of {@link P}.
+ *
+ * @return a Protocol Buffer message builder
+ */
+ Builder newBuilder();
+
+ /**
+ * Serializes a fragment into a Protocol Buffer message.
+ *
+ * @param fragment
+ * @return a serialized Protocol Buffer message
+ */
+ P serialize(F fragment);
+
+ /**
+ * Deserializes a Protocol Buffer message to a fragment.
+ *
+ * @param proto
+ * @return a deserialized fragment instance
+ */
+ F deserialize(P proto);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 2454714..4e57204 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -44,52 +44,28 @@
<!--- Fragment Class Configurations -->
<property>
- <name>tajo.storage.fragment.text.class</name>
+ <name>tajo.storage.fragment.kind.file</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.json.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.draw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.orc.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.file</name>
+ <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+ <name>tajo.storage.fragment.serde.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.jdbc.class</name>
- <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
+ <name>tajo.storage.fragment.serde.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
</property>
<!--- Scanner Handler -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 1c4530a..5bf6b0b 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -43,52 +43,28 @@
<!--- Fragment Class Configurations -->
<property>
- <name>tajo.storage.fragment.text.class</name>
+ <name>tajo.storage.fragment.kind.file</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.json.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.draw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.orc.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.file</name>
+ <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+ <name>tajo.storage.fragment.serde.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.jdbc.class</name>
- <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
+ <name>tajo.storage.fragment.serde.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
</property>
<!--- Scanner Handler -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
index 18aa515..e1026bb 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -19,112 +19,49 @@
package org.apache.tajo.storage.hbase;
import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto;
+import org.apache.tajo.storage.hbase.HBaseFragment.HBaseFragmentKey;
import java.net.URI;
-public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
- @Expose
- private URI uri;
- @Expose
- private String tableName;
- @Expose
+/**
+ * Fragment for HBase
+ */
+public class HBaseFragment extends Fragment<HBaseFragmentKey> {
private String hbaseTableName;
- @Expose
- private byte[] startRow;
- @Expose
- private byte[] stopRow;
- @Expose
- private String regionLocation;
- @Expose
private boolean last;
- @Expose
- private long length;
public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
String regionLocation) {
- this.uri = uri;
- this.tableName = tableName;
+ super(BuiltinFragmentKinds.HBASE, uri, tableName, new HBaseFragmentKey(startRow), new HBaseFragmentKey(stopRow),
+ TajoConstants.UNKNOWN_LENGTH, new String[]{regionLocation});
+
this.hbaseTableName = hbaseTableName;
- this.startRow = startRow;
- this.stopRow = stopRow;
- this.regionLocation = regionLocation;
this.last = false;
}
- public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
- HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder.mergeFrom(raw);
- builder.build();
- init(builder.build());
- }
-
- private void init(HBaseFragmentProto proto) {
- this.uri = URI.create(proto.getUri());
- this.tableName = proto.getTableName();
- this.hbaseTableName = proto.getHbaseTableName();
- this.startRow = proto.getStartRow().toByteArray();
- this.stopRow = proto.getStopRow().toByteArray();
- this.regionLocation = proto.getRegionLocation();
- this.length = proto.getLength();
- this.last = proto.getLast();
- }
-
- @Override
- public int compareTo(HBaseFragment t) {
- return Bytes.compareTo(startRow, t.startRow);
- }
-
- public URI getUri() {
- return uri;
- }
-
- @Override
- public String getTableName() {
- return tableName;
- }
-
- @Override
- public String getKey() {
- return new String(startRow);
+ public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
+ String regionLocation, boolean last) {
+ this(uri, tableName, hbaseTableName, startRow, stopRow, regionLocation);
+ this.last = last;
}
@Override
public boolean isEmpty() {
- return startRow == null || stopRow == null;
- }
-
- @Override
- public long getLength() {
- return length;
+ return startKey.isEmpty() || endKey.isEmpty();
}
public void setLength(long length) {
this.length = length;
}
- @Override
- public String[] getHosts() {
- return new String[] {regionLocation};
- }
-
public Object clone() throws CloneNotSupportedException {
HBaseFragment frag = (HBaseFragment) super.clone();
- frag.uri = uri;
- frag.tableName = tableName;
frag.hbaseTableName = hbaseTableName;
- frag.startRow = startRow;
- frag.stopRow = stopRow;
- frag.regionLocation = regionLocation;
frag.last = last;
- frag.length = length;
return frag;
}
@@ -132,9 +69,9 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
public boolean equals(Object o) {
if (o instanceof HBaseFragment) {
HBaseFragment t = (HBaseFragment) o;
- if (tableName.equals(t.tableName)
- && Bytes.equals(startRow, t.startRow)
- && Bytes.equals(stopRow, t.stopRow)) {
+ if (inputSourceId.equals(t.inputSourceId)
+ && startKey.equals(t.startKey)
+ && endKey.equals(t.endKey)) {
return true;
}
}
@@ -143,51 +80,19 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
@Override
public int hashCode() {
- return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
+ return Objects.hashCode(inputSourceId, hbaseTableName, startKey, endKey);
}
@Override
public String toString() {
return
- "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName +
+ "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ inputSourceId +
"\", hbaseTableName\": \"" + hbaseTableName + "\"" +
- ", \"startRow\": \"" + new String(startRow) + "\"" +
- ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+ ", \"startRow\": \"" + new String(startKey.bytes) + "\"" +
+ ", \"stopRow\": \"" + new String(endKey.bytes) + "\"" +
", \"length\": \"" + length + "\"}" ;
}
- @Override
- public FragmentProto getProto() {
- HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder
- .setUri(uri.toString())
- .setTableName(tableName)
- .setHbaseTableName(hbaseTableName)
- .setStartRow(ByteString.copyFrom(startRow))
- .setStopRow(ByteString.copyFrom(stopRow))
- .setLast(last)
- .setLength(length)
- .setRegionLocation(regionLocation);
-
- FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
- fragmentBuilder.setId(this.tableName);
- fragmentBuilder.setContents(builder.buildPartial().toByteString());
- fragmentBuilder.setDataFormat(BuiltinStorages.HBASE);
- return fragmentBuilder.build();
- }
-
- public byte[] getStartRow() {
- return startRow;
- }
-
- public byte[] getStopRow() {
- return stopRow;
- }
-
- public String getRegionLocation() {
- return regionLocation;
- }
-
public boolean isLast() {
return last;
}
@@ -200,15 +105,51 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
return hbaseTableName;
}
- public void setHbaseTableName(String hbaseTableName) {
- this.hbaseTableName = hbaseTableName;
- }
-
public void setStartRow(byte[] startRow) {
- this.startRow = startRow;
+ this.startKey = new HBaseFragmentKey(startRow);
}
public void setStopRow(byte[] stopRow) {
- this.stopRow = stopRow;
+ this.endKey = new HBaseFragmentKey(stopRow);
+ }
+
+ public static class HBaseFragmentKey implements Comparable<HBaseFragmentKey> {
+ private final byte[] bytes;
+
+ public HBaseFragmentKey(byte[] key) {
+ this.bytes = key;
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(bytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof HBaseFragmentKey) {
+ HBaseFragmentKey other = (HBaseFragmentKey) o;
+ return Bytes.equals(bytes, other.bytes);
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(HBaseFragmentKey o) {
+ return Bytes.compareTo(bytes, o.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return new String(bytes);
+ }
+
+ public boolean isEmpty() {
+ return this.bytes == null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
new file mode 100644
index 0000000..f896f43
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.tajo.storage.fragment.FragmentSerde;
+import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto;
+
+import java.net.URI;
+
+public class HBaseFragmentSerde implements FragmentSerde<HBaseFragment, HBaseFragmentProto> {
+
+ @Override
+ public Builder newBuilder() {
+ return HBaseFragmentProto.newBuilder();
+ }
+
+ @Override
+ public HBaseFragmentProto serialize(HBaseFragment fragment) {
+ return HBaseFragmentProto.newBuilder()
+ .setUri(fragment.getUri().toASCIIString())
+ .setTableName(fragment.getInputSourceId())
+ .setHbaseTableName(fragment.getHbaseTableName())
+ .setStartRow(ByteString.copyFrom(fragment.getStartKey().getBytes()))
+ .setStopRow(ByteString.copyFrom(fragment.getEndKey().getBytes()))
+ .setLast(fragment.isLast())
+ .setLength(fragment.getLength())
+ .setRegionLocation(fragment.getHostNames().get(0))
+ .build();
+ }
+
+ @Override
+ public HBaseFragment deserialize(HBaseFragmentProto proto) {
+ return new HBaseFragment(
+ URI.create(proto.getUri()),
+ proto.getTableName(),
+ proto.getHbaseTableName(),
+ proto.getStartRow().toByteArray(),
+ proto.getStopRow().toByteArray(),
+ proto.getRegionLocation(),
+ proto.getLast());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index b2ca02d..781f911 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -36,10 +36,16 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.exception.*;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BytesUtils;
@@ -175,16 +181,16 @@ public class HBaseScanner implements Scanner {
}
}
- scan.setStartRow(fragment.getStartRow());
- if (fragment.isLast() && fragment.getStopRow() != null &&
- fragment.getStopRow().length > 0) {
+ scan.setStartRow(fragment.getStartKey().getBytes());
+ if (fragment.isLast() && !fragment.getEndKey().isEmpty() &&
+ fragment.getEndKey().getBytes().length > 0) {
// last and stopRow is not empty
if (filters == null) {
filters = new FilterList();
}
- filters.addFilter(new InclusiveStopFilter(fragment.getStopRow()));
+ filters.addFilter(new InclusiveStopFilter(fragment.getEndKey().getBytes()));
} else {
- scan.setStopRow(fragment.getStopRow());
+ scan.setStopRow(fragment.getEndKey().getBytes());
}
if (filters != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 0cf883e..e3f7c25 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -525,10 +525,10 @@ public class HBaseTablespace extends Tablespace {
if (fragmentMap.containsKey(regionStartKey)) {
final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
- if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+ if (Bytes.compareTo(fragmentStart, prevFragment.getStartKey().getBytes()) < 0) {
prevFragment.setStartRow(fragmentStart);
}
- if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+ if (Bytes.compareTo(fragmentStop, prevFragment.getEndKey().getBytes()) > 0) {
prevFragment.setStopRow(fragmentStop);
}
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index f3cb9a5..17c413e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -453,8 +453,8 @@ public class FileTablespace extends Tablespace {
/**
* Get Disk Ids by Volume Bytes
*/
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
+ private Integer[] getDiskIds(VolumeId[] volumeIds) {
+ Integer[] diskIds = new Integer[volumeIds.length];
for (int i = 0; i < volumeIds.length; i++) {
int diskId = -1;
if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 6d8443e..a6850c1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -92,7 +92,7 @@ public class RawFile {
fis = new FileInputStream(file);
channel = fis.getChannel();
filePosition = startOffset = fragment.getStartKey();
- endOffset = fragment.getStartKey() + fragment.getLength();
+ endOffset = fragment.getEndKey();
if (LOG.isDebugEnabled()) {
LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()