You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:12 UTC
[14/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java
deleted file mode 100644
index 7916a16..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/HDFSSeekableLineReader.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-
-/**
- * A class that provides a line reader from an input stream which also allows performing seek operations
- */
-public class HDFSSeekableLineReader {
- private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private FSDataInputStream reader;
-
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
-
- private long currentFilePos = 0L;
-
- private static final byte CR = '\r';
- private static final byte LF = '\n';
-
- public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
-
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size (32k).
- *
- * @param in
- * The input stream
- * @throws IOException
- */
- public HDFSSeekableLineReader(FSDataInputStream in) throws IOException {
- this(in, DEFAULT_BUFFER_SIZE);
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size.
- *
- * @param in
- * The input stream
- * @param bufferSize
- * Size of the read buffer
- * @throws IOException
- */
- public HDFSSeekableLineReader(FSDataInputStream in, int bufferSize) throws IOException {
- this.reader = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- currentFilePos = in.getPos();
- }
-
- public HDFSSeekableLineReader() throws IOException {
- this.bufferSize = DEFAULT_BUFFER_SIZE;
- this.buffer = new byte[this.bufferSize];
- }
-
- /**
- * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
- *
- * @param in
- * input stream
- * @param conf
- * configuration
- * @throws IOException
- */
- public HDFSSeekableLineReader(FSDataInputStream in, Configuration conf) throws IOException {
- this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
- }
-
- /**
- * Read one line from the InputStream into the given Text. A line
- * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
- * line.
- *
- * @param str
- * the object to store the given line (without newline)
- * @param maxLineLength
- * the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume
- * the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- * @return the number of bytes read including the (longest) newline
- * found.
- * @throws IOException
- * if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR)
- ++bytesConsumed; //account for CR from previous read
- bufferLength = reader.read(buffer);
- if (bufferLength <= 0)
- break; // EOF
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0)
- --readLength; //CR at the end of the buffer
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
- if (bytesConsumed > (long) Integer.MAX_VALUE)
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- currentFilePos = reader.getPos() - bufferLength + bufferPosn;
- return (int) bytesConsumed;
- }
-
- /**
- * Read from the InputStream into the given Text.
- *
- * @param str
- * the object to store the given line
- * @param maxLineLength
- * the maximum number of bytes to store into str.
- * @return the number of bytes read including the newline
- * @throws IOException
- * if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength) throws IOException {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
-
- /**
- * Read from the InputStream into the given Text.
- *
- * @param str
- * the object to store the given line
- * @return the number of bytes read including the newline
- * @throws IOException
- * if the underlying stream throws
- */
- public int readLine(Text str) throws IOException {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
-
- public void seek(long desired) throws IOException {
- if (reader.getPos() <= desired || currentFilePos > desired) {
- // desired position is ahead of stream or before the current position, seek to position
- reader.seek(desired);
- bufferLength = 0;
- bufferPosn = 0;
- currentFilePos = desired;
- } else if (currentFilePos < desired) {
- // desired position is in the buffer
- int difference = (int) (desired - currentFilePos);
- bufferPosn += difference;
- currentFilePos = desired;
- }
- }
-
- public FSDataInputStream getReader() {
- return reader;
- }
-
- public void resetReader(FSDataInputStream reader) throws IOException {
- this.reader = reader;
- bufferLength = 0;
- bufferPosn = 0;
- currentFilePos = reader.getPos();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java
deleted file mode 100644
index d48aaf7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/ILookupReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-public interface ILookupReader {
- public Object read(int fileNumber, long recordOffset) throws Exception;
- public void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
deleted file mode 100644
index 50853d4..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-//Used in two cases:
-//1. building an index over a dataset
-//2. performing full scan over a dataset that has built index (to provide consistent view) with RCFile format
-
-@SuppressWarnings("rawtypes")
-public class RCFileDataReader extends AbstractHDFSReader {
-
- private RecordReader reader;
- private Object key;
- private Object value;
- private int currentSplitIndex = 0;
- private String fileName;
- private long recordGroupOffset;
- private long nextRecordGroupOffset;
- private boolean executed[];
- private InputSplit[] inputSplits;
- private String[] readSchedule;
- private String nodeName;
- private JobConf conf;
- private List<ExternalFile> files;
- private FileSystem hadoopFS;
-
- public RCFileDataReader(InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
- boolean executed[], List<ExternalFile> files) throws IOException {
- this.executed = executed;
- this.inputSplits = inputSplits;
- this.readSchedule = readSchedule;
- this.nodeName = nodeName;
- this.conf = conf;
- this.files = files;
- hadoopFS = FileSystem.get(conf);
- }
-
- private boolean moveToNext() throws IOException {
- for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (readSchedule[currentSplitIndex].equals(nodeName)) {
- /**
- * pick an unread split to read synchronize among
- * simultaneous partitions in the same machine
- */
- synchronized (executed) {
- if (executed[currentSplitIndex] == false) {
- executed[currentSplitIndex] = true;
- } else {
- continue;
- }
- }
-
- /**
- * read the split
- */
- try {
- if (files != null) {
- fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
- FileStatus fileStatus = hadoopFS.getFileStatus(new Path(fileName));
- //skip if not the same file stored in the files snapshot
- if (fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime()
- .getTime())
- continue;
- }
- reader = getRecordReader(currentSplitIndex);
- recordGroupOffset = -1;
- nextRecordGroupOffset = reader.getPos();
- } catch (Exception e) {
- continue;
- }
- key = reader.createKey();
- value = reader.createValue();
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- throw new NotImplementedException("Use readNext()");
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use readNext()");
- }
-
- private RecordReader getRecordReader(int slitIndex) throws IOException {
- RecordReader reader;
- try {
- reader = conf.getInputFormat().getRecordReader(inputSplits[slitIndex], conf, getReporter());
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- return reader;
- }
-
- @Override
- public boolean initialize() throws IOException {
- return moveToNext();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Object readNext() throws IOException {
-
- if (reader == null) {
- return null;
- }
- if (reader.next(key, value)) {
- if (reader.getPos() != nextRecordGroupOffset) {
- recordGroupOffset = nextRecordGroupOffset;
- nextRecordGroupOffset = reader.getPos();
- }
- return value;
- }
- while (moveToNext()) {
- if (reader.next(key, value)) {
- if (reader.getPos() != nextRecordGroupOffset) {
- recordGroupOffset = nextRecordGroupOffset;
- nextRecordGroupOffset = reader.getPos();
- }
- return value;
- }
- }
- return null;
- }
-
- @Override
- public String getFileName() throws Exception {
- return files.get(currentSplitIndex).getFileName();
- }
-
- @Override
- public long getReaderPosition() throws Exception {
- return recordGroupOffset;
- }
-
- @Override
- public int getFileNumber() throws Exception {
- return files.get(currentSplitIndex).getFileNumber();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
deleted file mode 100644
index f312228..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-public class RCFileLookupReader {
- private FileSystem fs;
- private Configuration conf;
- private int fileNumber = -1;
- private int rowNumber;
- private long recordGroupOffset;
- private Reader reader;
- boolean skipFile = false;
- private LongWritable rcKey = new LongWritable();
- private BytesRefArrayWritable rcValue = new BytesRefArrayWritable();
- private ExternalFile currentFile = new ExternalFile(null, null, 0, null, null, 0L,
- ExternalFilePendingOp.PENDING_NO_OP);
- private ExternalFileIndexAccessor filesIndexAccessor;
-
- public RCFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf) throws IOException {
- fs = FileSystem.get(conf);
- this.conf = conf;
- this.filesIndexAccessor = filesIndexAccessor;
- }
-
- public Writable read(int fileNumber, long recordGroupOffset, int rowNumber) throws Exception {
- if (fileNumber != this.fileNumber) {
- filesIndexAccessor.searchForFile(fileNumber, currentFile);
- try {
- FileStatus fileStatus = fs.getFileStatus(new Path(currentFile.getFileName()));
- if (fileStatus.getModificationTime() != currentFile.getLastModefiedTime().getTime()) {
- this.fileNumber = fileNumber;
- skipFile = true;
- return null;
- } else {
- this.fileNumber = fileNumber;
- skipFile = false;
- }
- } catch (FileNotFoundException e) {
- // Couldn't find file, skip it
- this.fileNumber = fileNumber;
- skipFile = true;
- return null;
- }
- // Close old file and open new one
- if (reader != null)
- reader.close();
- reader = new Reader(fs, new Path(currentFile.getFileName()), conf);
- this.recordGroupOffset = -1;
- this.rowNumber = -1;
- } else if (skipFile) {
- return null;
- }
- // Seek to the record group if needed
- if (recordGroupOffset != this.recordGroupOffset) {
- this.recordGroupOffset = recordGroupOffset;
- if (reader.getPosition() != recordGroupOffset)
- reader.seek(recordGroupOffset);
- reader.resetBuffer();
- this.rowNumber = -1;
- }
-
- // skip rows to the record row
- while (this.rowNumber < rowNumber) {
- reader.next(rcKey);
- reader.getCurrentRow(rcValue);
- this.rowNumber++;
- }
- return rcValue;
- }
-
- public void close() throws Exception {
- if (reader != null)
- reader.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
deleted file mode 100644
index e787921..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-@SuppressWarnings("deprecation")
-public class SequenceFileLookupInputStream extends AbstractHDFSLookupInputStream {
-
- private SequenceFile.Reader reader;
- private Writable seqKey;
- private Text seqValue = new Text();
- private Configuration conf;
-
- public SequenceFileLookupInputStream(ExternalFileIndexAccessor fileIndexAccessor, JobConf conf) throws IOException {
- super(fileIndexAccessor, conf);
- this.conf = conf;
- }
-
- @Override
- protected void openFile(String fileName) throws IOException {
- if (reader != null) {
- reader.close();
- }
- reader = new SequenceFile.Reader(fs, new Path(fileName), conf);
- seqKey = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- }
-
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- super.close();
- }
-
- @Override
- protected boolean read(long recordOffset) {
- try {
- reader.seek(recordOffset);
- reader.next(seqKey, seqValue);
- pendingValue = seqValue.toString();
- return true;
- } catch (IOException e) {
- // Same Question: seek and read failed afer openning file succeede, should we do something about it?
- e.printStackTrace();
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
deleted file mode 100644
index 76b3660..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class SequenceFileLookupReader implements ILookupReader {
-
- private Reader reader;
- private Writable key;
- private Writable value;
- private FileSystem fs;
- private int fileNumber = -1;
- private boolean skipFile = false;
- private ExternalFile file = new ExternalFile(null, null, 0, null, null, 0L, ExternalFilePendingOp.PENDING_NO_OP);
- private ExternalFileIndexAccessor filesIndexAccessor;
- private Configuration conf;
-
- public SequenceFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf)
- throws IOException {
- fs = FileSystem.get(conf);
- this.filesIndexAccessor = filesIndexAccessor;
- this.conf = conf;
- }
-
- @Override
- public Writable read(int fileNumber, long recordOffset) throws Exception {
- if (fileNumber != this.fileNumber) {
- //get file name
- this.fileNumber = fileNumber;
- filesIndexAccessor.searchForFile(fileNumber, file);
- try {
- FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
- if (fileStatus.getModificationTime() != file.getLastModefiedTime().getTime()) {
- this.fileNumber = fileNumber;
- skipFile = true;
- return null;
- } else {
- this.fileNumber = fileNumber;
- skipFile = false;
- openFile(file.getFileName());
- }
- } catch (FileNotFoundException e) {
- // file was not found, do nothing and skip its tuples
- this.fileNumber = fileNumber;
- skipFile = true;
- return null;
- }
- } else if (skipFile) {
- return null;
- }
- reader.seek(recordOffset);
- reader.next(key, value);
- return value;
- }
-
- @SuppressWarnings("deprecation")
- private void openFile(String FileName) throws IOException {
- if (reader != null)
- try {
- reader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- reader = new SequenceFile.Reader(fs, new Path(FileName), conf);
- key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- }
-
- @Override
- public void close() {
- if (reader != null)
- try {
- reader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
deleted file mode 100644
index ea82c18..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-
-public class TextFileLookupInputStream extends AbstractHDFSLookupInputStream {
-
- private HDFSSeekableLineReader lineReader = new HDFSSeekableLineReader();
- private Text value = new Text();
-
- public TextFileLookupInputStream(ExternalFileIndexAccessor filesIndexAccessor, JobConf conf) throws IOException {
- super(filesIndexAccessor, conf);
- }
-
- @Override
- public void openFile(String FileName) throws IOException {
- if (lineReader.getReader() != null) {
- lineReader.getReader().close();
- }
- lineReader.resetReader(fs.open(new Path(FileName)));
- }
-
- @Override
- public void close() {
- if (lineReader.getReader() != null) {
- try {
- lineReader.getReader().close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- protected boolean read(long recordOffset) {
- try {
- lineReader.seek(recordOffset);
- lineReader.readLine(value);
- pendingValue = value.toString();
- return true;
- } catch (IOException e) {
- // file was opened and then when trying to seek and read, an error occurred <- should we throw an exception ???->
- e.printStackTrace();
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
deleted file mode 100644
index 5864df2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-
-public class TextFileLookupReader implements ILookupReader {
- private FileSystem fs;
- private int fileNumber = -1;
- private boolean skipFile = false;
- private ExternalFile file = new ExternalFile(null, null, 0, null, null, 0L, ExternalFilePendingOp.PENDING_NO_OP);
- private ExternalFileIndexAccessor filesIndexAccessor;
- private HDFSSeekableLineReader lineReader;
- private Text value = new Text();
-
- public TextFileLookupReader(ExternalFileIndexAccessor filesIndexAccessor, Configuration conf) throws IOException {
- this.fs = FileSystem.get(conf);
- this.filesIndexAccessor = filesIndexAccessor;
- this.lineReader = new HDFSSeekableLineReader();
- }
-
- @Override
- public String read(int fileNumber, long recordOffset) throws Exception {
- if (fileNumber != this.fileNumber) {
- this.fileNumber = fileNumber;
- filesIndexAccessor.searchForFile(fileNumber, file);
-
- try {
- FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
- if (fileStatus.getModificationTime() != file.getLastModefiedTime().getTime()) {
- this.fileNumber = fileNumber;
- skipFile = true;
- return null;
- } else {
- this.fileNumber = fileNumber;
- skipFile = false;
- openFile(file.getFileName());
- }
- } catch (FileNotFoundException e) {
- // File is not there, skip it and do nothing
- this.fileNumber = fileNumber;
- skipFile = true;
- return null;
- }
- } else if (skipFile) {
- return null;
- }
- lineReader.seek(recordOffset);
- lineReader.readLine(value);
- return value.toString();
- }
-
- private void openFile(String FileName) throws IOException {
- if (lineReader.getReader() != null) {
- lineReader.getReader().close();
- }
- lineReader.resetReader(fs.open(new Path(FileName)));
- }
-
- @Override
- public void close() {
- if (lineReader.getReader() != null) {
- try {
- lineReader.getReader().close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
deleted file mode 100644
index 5e4f013..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-// Used in two cases:
-// 1. building an index over a dataset
-// 2. performing full scan over a dataset that has built index (to provide consistent view)
-
-@SuppressWarnings("rawtypes")
-public class TextualDataReader extends AbstractHDFSReader {
-
- private RecordReader<Object, Text> reader;
- private Object key;
- private Text value;
- private boolean hasMore = false;
- private int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
- private int currentSplitIndex = 0;
- private String fileName;
- private long recordOffset;
- private boolean executed[];
- private InputSplit[] inputSplits;
- private String[] readSchedule;
- private String nodeName;
- private JobConf conf;
- private List<ExternalFile> files;
- private FileSystem hadoopFS;
-
- public TextualDataReader(InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
- boolean executed[], List<ExternalFile> files) throws IOException {
- this.executed = executed;
- this.inputSplits = inputSplits;
- this.readSchedule = readSchedule;
- this.nodeName = nodeName;
- this.conf = conf;
- this.files = files;
- hadoopFS = FileSystem.get(conf);
- }
-
- @Override
- public boolean initialize() throws Exception {
- return moveToNext();
- }
-
- @Override
- public Object readNext() throws Exception {
- if (reader == null) {
- return null;
- }
- recordOffset = reader.getPos();
- if (reader.next(key, value)) {
- return value;
- }
- while (moveToNext()) {
- recordOffset = reader.getPos();
- if (reader.next(key, value)) {
- return value;
- }
- }
- return null;
- }
-
- @Override
- public int getFileNumber() throws Exception {
- return files.get(currentSplitIndex).getFileNumber();
- }
-
- @Override
- public String getFileName() throws Exception {
- return files.get(currentSplitIndex).getFileName();
- }
-
- @Override
- public long getReaderPosition() throws Exception {
- return recordOffset;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
- }
-
- @SuppressWarnings("unchecked")
- private boolean moveToNext() throws IOException {
- for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (readSchedule[currentSplitIndex].equals(nodeName)) {
- /**
- * pick an unread split to read synchronize among
- * simultaneous partitions in the same machine
- */
- synchronized (executed) {
- if (executed[currentSplitIndex] == false) {
- executed[currentSplitIndex] = true;
- } else {
- continue;
- }
- }
-
- /**
- * read the split
- */
- try {
- if (files != null) {
- fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
- FileStatus fileStatus = hadoopFS.getFileStatus(new Path(fileName));
- // Skip if not the same file stored in the files snapshot
- if (fileStatus.getModificationTime() != files.get(currentSplitIndex).getLastModefiedTime()
- .getTime())
- continue;
- }
- // It is the same file
- reader = getRecordReader(currentSplitIndex);
- } catch (Exception e) {
- // ignore exceptions <-- This might change later -->
- continue;
- }
- key = reader.createKey();
- value = reader.createValue();
- return true;
- }
- }
- return false;
- }
-
- private RecordReader getRecordReader(int splitIndex) throws IOException {
- RecordReader reader;
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- reader = format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
- } else {
- TextInputFormat format = (TextInputFormat) conf.getInputFormat();
- reader = format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
- }
- return reader;
- }
-
- // Return one record at a time <to preserve the indexing information>
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (reader == null) {
- if (!moveToNext()) {
- // nothing to read
- return -1;
- }
- }
-
- int numBytes = 0;
- if (pendingValue != null) {
- int sizeOfNextTuple = pendingValue.getLength() + 1;
- if (sizeOfNextTuple > len) {
- return 0;
- }
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
- return numBytes;
- }
- if (numBytes < len) {
- //store the byte location
- recordOffset = reader.getPos();
- hasMore = reader.next(key, value);
- if (!hasMore) {
- while (moveToNext()) {
- //store the byte location
- recordOffset = reader.getPos();
- hasMore = reader.next(key, value);
- if (hasMore) {
- //return the value read
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
- pendingValue = value;
- return 0;
- } else {
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- return numBytes;
- }
- }
- }
- return -1;
- } else {
- //return the value read
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
- pendingValue = value;
- return 0;
- } else {
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- return numBytes;
- }
- }
- }
- return numBytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
deleted file mode 100644
index 9fe09a2..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.input;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.Counters.Counter;
-
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-
-public class TextualFullScanDataReader extends InputStream {
-
- private RecordReader<Object, Text> reader;
- private Object key;
- private Text value;
- private boolean hasMore = false;
- private int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
- private int currentSplitIndex = 0;
- private boolean executed[];
- private InputSplit[] inputSplits;
- private String[] readSchedule;
- private String nodeName;
- private JobConf conf;
-
- public TextualFullScanDataReader(boolean executed[], InputSplit[] inputSplits, String[] readSchedule,
- String nodeName, JobConf conf) {
- this.executed = executed;
- this.inputSplits = inputSplits;
- this.readSchedule = readSchedule;
- this.nodeName = nodeName;
- this.conf = conf;
- }
-
- @Override
- public int available() {
- return 1;
- }
-
- @SuppressWarnings("unchecked")
- private boolean moveToNext() throws IOException {
- for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (readSchedule[currentSplitIndex].equals(nodeName)) {
- /**
- * pick an unread split to read synchronize among
- * simultaneous partitions in the same machine
- */
- synchronized (executed) {
- if (executed[currentSplitIndex] == false) {
- executed[currentSplitIndex] = true;
- } else {
- continue;
- }
- }
-
- /**
- * read the split
- */
- reader = getRecordReader(currentSplitIndex);
- key = reader.createKey();
- value = (Text) reader.createValue();
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (reader == null) {
- if (!moveToNext()) {
- // nothing to read
- return -1;
- }
- }
-
- int numBytes = 0;
- if (pendingValue != null) {
- int sizeOfNextTuple = pendingValue.getLength() + 1;
- if (sizeOfNextTuple > len) {
- return 0;
- }
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
- }
-
- while (numBytes < len) {
- hasMore = reader.next(key, value);
- if (!hasMore) {
- while (moveToNext()) {
- hasMore = reader.next(key, value);
- if (hasMore) {
- // move to the next non-empty split
- break;
- }
- }
- }
- if (!hasMore) {
- return (numBytes == 0) ? -1 : numBytes;
- }
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
- // cannot add tuple to current buffer
- // but the reader has moved pass the fetched tuple
- // we need to store this for a subsequent read call.
- // and return this then.
- pendingValue = value;
- break;
- } else {
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- }
- }
- return numBytes;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
- }
-
- @SuppressWarnings("rawtypes")
- private RecordReader getRecordReader(int splitIndex) throws IOException {
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
- conf, getReporter());
- return reader;
- } else {
- TextInputFormat format = (TextInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
- conf, getReporter());
- return reader;
- }
- }
-
- private Reporter getReporter() {
- Reporter reporter = new Reporter() {
-
- @Override
- public Counter getCounter(Enum<?> arg0) {
- return null;
- }
-
- @Override
- public Counter getCounter(String arg0, String arg1) {
- return null;
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> arg0, long arg1) {
- }
-
- @Override
- public void incrCounter(String arg0, String arg1, long arg2) {
- }
-
- @Override
- public void setStatus(String arg0) {
- }
-
- @Override
- public void progress() {
- }
-
- @Override
- public float getProgress() {
- return 0.0f;
- }
- };
-
- return reporter;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
deleted file mode 100644
index 89abf0f..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.util.List;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-// This is an operator that takes a single file index and an array of secondary indexes
-// it is intended to be used for
-// 1. commit transaction operation
-// 2. abort transaction operation
-// 3. recover transaction operation
-public abstract class AbstractExternalDatasetIndexesOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory;
- private IndexInfoOperatorDescriptor fileIndexInfo;
- private List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories;
- private List<IndexInfoOperatorDescriptor> bTreeIndexesInfos;
- private List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories;
- private List<IndexInfoOperatorDescriptor> rTreeIndexesInfos;
-
- public AbstractExternalDatasetIndexesOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
- IndexInfoOperatorDescriptor fileIndexesInfo,
- List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
- List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
- super(spec, 0, 0);
- this.filesIndexDataflowHelperFactory = filesIndexDataflowHelperFactory;
- this.fileIndexInfo = fileIndexesInfo;
- this.bTreeIndexesDataflowHelperFactories = bTreeIndexesDataflowHelperFactories;
- this.bTreeIndexesInfos = bTreeIndexesInfos;
- this.rTreeIndexesDataflowHelperFactories = rTreeIndexesDataflowHelperFactories;
- this.rTreeIndexesInfos = rTreeIndexesInfos;
- }
-
- // opening and closing the index is done inside these methods since we don't always need open indexes
- protected abstract void performOpOnIndex(
- IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
- IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception;
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- return new AbstractOperatorNodePushable() {
-
- @Override
- public void initialize() throws HyracksDataException {
- try {
- // only in partition of device id = 0, we perform the operation on the files index
- if(fileIndexInfo.getFileSplitProvider().getFileSplits()[partition].getIODeviceId() == 0){
- performOpOnIndex(filesIndexDataflowHelperFactory, ctx, fileIndexInfo, partition);
- }
- // perform operation on btrees
- for (int i = 0; i < bTreeIndexesDataflowHelperFactories.size(); i++) {
- performOpOnIndex(bTreeIndexesDataflowHelperFactories.get(i), ctx,
- bTreeIndexesInfos.get(i), partition);
- }
- // perform operation on rtrees
- for (int i = 0; i < rTreeIndexesDataflowHelperFactories.size(); i++) {
- performOpOnIndex(rTreeIndexesDataflowHelperFactories.get(i), ctx,
- rTreeIndexesInfos.get(i), partition);
- }
- } catch (Exception e) {
- // This should never happen <unless there is a hardware failure or something serious>
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void deinitialize() throws HyracksDataException {
- }
-
- @Override
- public int getInputArity() {
- return 0;
- }
-
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
- throws HyracksDataException {
- }
-
- @Override
- public IFrameWriter getInputFrameWriter(int index) {
- return null;
- }
-
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
deleted file mode 100644
index 6ff991b..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbortRecoverLSMIndexFileManager;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- public ExternalDatasetIndexesAbortOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
- IndexInfoOperatorDescriptor fileIndexesInfo,
- List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
- List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
- super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, bTreeIndexesDataflowHelperFactories,
- bTreeIndexesInfos, rTreeIndexesDataflowHelperFactories, rTreeIndexesInfos);
- }
-
- @Override
- protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
- IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
- AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
- fileManager.deleteTransactionFiles();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
deleted file mode 100644
index e89a8db..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-public class ExternalDatasetIndexesCommitOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor {
-
- public ExternalDatasetIndexesCommitOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
- IndexInfoOperatorDescriptor fileIndexesInfo,
- List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
- List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
- super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, bTreeIndexesDataflowHelperFactories,
- bTreeIndexesInfos, rTreeIndexesDataflowHelperFactories, rTreeIndexesInfos);
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
- IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
- System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
- // Get DataflowHelper
- IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
- // Get index
- IIndex index = indexHelper.getIndexInstance();
- // commit transaction
- ((ITwoPCIndex) index).commitTransaction();
- System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
deleted file mode 100644
index 9bdfaa6..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbortRecoverLSMIndexFileManager;
-import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
-
-public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExternalDatasetIndexesOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- public ExternalDatasetIndexesRecoverOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory,
- IndexInfoOperatorDescriptor fileIndexesInfo,
- List<ExternalBTreeWithBuddyDataflowHelperFactory> bTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> bTreeIndexesInfos,
- List<ExternalRTreeDataflowHelperFactory> rTreeIndexesDataflowHelperFactories,
- List<IndexInfoOperatorDescriptor> rTreeIndexesInfos) {
- super(spec, filesIndexDataflowHelperFactory, fileIndexesInfo, bTreeIndexesDataflowHelperFactories,
- bTreeIndexesInfos, rTreeIndexesDataflowHelperFactories, rTreeIndexesInfos);
- }
-
- @Override
- protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
- IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
- FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
- AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
- fileManager.recoverTransaction();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java
deleted file mode 100644
index f56b3ae..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorDescriptor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
-
-public class ExternalIndexBulkModifyOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private final int[] deletedFiles;
- private final int[] fieldPermutation;
- private final float fillFactor;
- private final long numElementsHint;
-
- public ExternalIndexBulkModifyOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
- IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
- IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
- IIndexDataflowHelperFactory dataflowHelperFactory,
- IModificationOperationCallbackFactory modificationOpCallbackFactory, int[] deletedFiles,
- int[] fieldPermutation, float fillFactor, long numElementsHint) {
- super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false, false, null,
- NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
- modificationOpCallbackFactory);
- this.deletedFiles = deletedFiles;
- this.fieldPermutation = fieldPermutation;
- this.fillFactor = fillFactor;
- this.numElementsHint = numElementsHint;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new ExternalIndexBulkModifyOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
- numElementsHint, recordDescProvider, deletedFiles);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
deleted file mode 100644
index a9c9ac7..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.operators;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
-import org.apache.hyracks.storage.am.common.api.IndexException;
-import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
-import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-
-public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
-
- private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
- private final int[] deletedFiles;
- private ArrayTupleBuilder buddyBTreeTupleBuilder = new ArrayTupleBuilder(
- filesIndexDescription.FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFieldCount());
- private AMutableInt32 fileNumber = new AMutableInt32(0);
- private ArrayTupleReference deleteTuple = new ArrayTupleReference();
-
- public ExternalIndexBulkModifyOperatorNodePushable(ExternalIndexBulkModifyOperatorDescriptor opDesc,
- IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, long numElementsHint,
- IRecordDescriptorProvider recordDescProvider, int[] deletedFiles) {
- super(opDesc, ctx, partition, fieldPermutation, fillFactor, false, numElementsHint, false, recordDescProvider);
- this.deletedFiles = deletedFiles;
- }
-
- // We override this method to do two things
- // when creating the bulkLoader, it creates a transaction bulk loader
- // It uses the bulkLoader to insert delete tuples for the deleted files
- @Override
- public void open() throws HyracksDataException {
- RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(recDesc);
- indexHelper.open();
- index = indexHelper.getIndexInstance();
- try {
- writer.open();
- // Transactional BulkLoader
- bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
- checkIfEmptyIndex);
- // Delete files
- for (int i = 0; i < deletedFiles.length; i++) {
- fileNumber.setValue(deletedFiles[i]);
- filesIndexDescription.getBuddyBTreeTupleFromFileNumber(deleteTuple, buddyBTreeTupleBuilder, fileNumber);
- ((ITwoPCIndexBulkLoader) bulkLoader).delete(deleteTuple);
- }
- } catch (Throwable e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- try {
- bulkLoader.add(tuple);
- } catch (IndexException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (index != null) {
- try {
- bulkLoader.end();
- } catch (Throwable th) {
- throw new HyracksDataException(th);
- } finally {
- try {
- indexHelper.close();
- } finally {
- writer.close();
- }
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (index != null) {
- try {
- ((ITwoPCIndexBulkLoader) bulkLoader).abort();
- } finally {
- writer.fail();
- }
- }
- }
-}