You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:14 UTC
[16/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java
new file mode 100644
index 0000000..07d09db
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import org.apache.asterix.om.base.AInt32;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RCRecordIdReader extends RecordIdReader {
+
+ public RCRecordIdReader(int[] ridFields) {
+ super(ridFields);
+ }
+
+ @Override
+ public RecordId read(int index) throws HyracksDataException {
+ if (super.read(index) == null) {
+ return null;
+ }
+ // Get row number
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset
+ + tupleAccessor.getFieldStartOffset(index, ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]));
+ rid.setRow(
+ ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]].deserialize(dis))
+ .getIntegerValue());
+ return rid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
new file mode 100644
index 0000000..14235c0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class RecordColumnarIndexer implements IExternalIndexer {
+
+ private static final long serialVersionUID = 1L;
+ public static final int NUM_OF_FIELDS = 3;
+ protected AMutableInt32 fileNumber = new AMutableInt32(0);
+ protected AMutableInt64 offset = new AMutableInt64(0);
+ protected long nextOffset;
+ protected AMutableInt32 rowNumber = new AMutableInt32(0);
+ protected RecordReader<?, Writable> recordReader;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<IAObject> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<IAObject> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+
+ @Override
+ public void reset(IRecordReader<?> reader) throws IOException {
+ //TODO: Make this more generic. right now, it works because we only index hdfs files.
+ @SuppressWarnings("unchecked")
+ HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
+ fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
+ recordReader = hdfsReader.getReader();
+ offset.setValue(recordReader.getPos());
+ nextOffset = offset.getLongValue();
+ rowNumber.setValue(0);
+ }
+
+ @Override
+ public void index(ArrayTupleBuilder tb) throws IOException {
+ if (recordReader.getPos() != nextOffset) {
+ // start of a new group
+ offset.setValue(nextOffset);
+ nextOffset = recordReader.getPos();
+ rowNumber.setValue(0);
+ }
+ tb.addField(intSerde, fileNumber);
+ tb.addField(longSerde, offset);
+ tb.addField(intSerde, rowNumber);
+ rowNumber.setValue(rowNumber.getIntegerValue() + 1);
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return NUM_OF_FIELDS;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java
new file mode 100644
index 0000000..9027101
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+public class RecordId {
+ public static enum RecordIdType {
+ OFFSET,
+ RC
+ }
+
+ private int fileId;
+ private long offset;
+ private int row;
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public void setFileId(int fileId) {
+ this.fileId = fileId;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public int getRow() {
+ return row;
+ }
+
+ public void setRow(int row) {
+ this.row = row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java
new file mode 100644
index 0000000..2b4cc9c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class RecordIdReader {
+
+ private final static byte nullByte = ATypeTag.NULL.serialize();
+ protected FrameTupleAccessor tupleAccessor;
+ protected int fieldSlotsLength;
+ protected int[] ridFields;
+ protected RecordId rid;
+ protected RecordDescriptor inRecDesc;
+ protected ByteBufferInputStream bbis;
+ protected DataInputStream dis;
+ protected int tupleStartOffset;
+ protected ByteBuffer frameBuffer;
+
+ public RecordIdReader(int[] ridFields) {
+ this.ridFields = ridFields;
+ this.rid = new RecordId();
+ }
+
+ public void set(FrameTupleAccessor accessor, RecordDescriptor inRecDesc) {
+ this.tupleAccessor = accessor;
+ this.fieldSlotsLength = accessor.getFieldSlotsLength();
+ this.inRecDesc = inRecDesc;
+ this.bbis = new ByteBufferInputStream();
+ this.dis = new DataInputStream(bbis);
+ }
+
+ public RecordId read(int index) throws HyracksDataException {
+ tupleStartOffset = tupleAccessor.getTupleStartOffset(index) + fieldSlotsLength;
+ int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(index,
+ ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]);
+ frameBuffer = tupleAccessor.getBuffer();
+ if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) {
+ return null;
+ }
+ // Get file number
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
+ rid.setFileId(
+ ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
+ .getIntegerValue());
+ // Get record group offset
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset
+ + tupleAccessor.getFieldStartOffset(index, ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
+ rid.setOffset(((AInt64) inRecDesc.getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]]
+ .deserialize(dis)).getLongValue());
+ return rid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java
new file mode 100644
index 0000000..d0bf2ff
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.indexing;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.HDFSUtils;
+
+public class RecordIdReaderFactory {
+
+ public static RecordIdReader create(Map<String, String> configuration, int[] ridFields) throws AsterixException {
+ switch (HDFSUtils.getRecordIdType(configuration)) {
+ case OFFSET:
+ return new RecordIdReader(ridFields);
+ case RC:
+ return new RCRecordIdReader(ridFields);
+ default:
+ throw new AsterixException("Unknown Record Id type: " + HDFSUtils.getRecordIdType(configuration));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java
deleted file mode 100644
index 07e09bd..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataOutput;
-import java.io.InputStream;
-
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-public abstract class AbstractIndexingTupleParser implements ITupleParser{
-
- protected ArrayTupleBuilder tb;
- protected DataOutput dos;
- protected final FrameTupleAppender appender;
- protected final ARecordType recType;
- protected final IHyracksCommonContext ctx;
- protected final IAsterixHDFSRecordParser deserializer;
- protected final AMutableInt32 aMutableInt = new AMutableInt32(0);
- protected final AMutableInt64 aMutableLong = new AMutableInt64(0);
-
- @SuppressWarnings("rawtypes")
- protected final ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- @SuppressWarnings("rawtypes")
- protected final ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-
- public AbstractIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser
- deserializer) throws HyracksDataException {
- appender = new FrameTupleAppender(new VSizeFrame(ctx));
- this.recType = recType;
- this.ctx = ctx;
- this.deserializer = deserializer;
- }
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- AbstractHDFSReader inReader = (AbstractHDFSReader) in;
- Object record;
- try {
- inReader.initialize();
- record = inReader.readNext();
- while (record != null) {
- tb.reset();
- deserializer.parse(record, tb.getDataOutput());
- tb.addFieldEndOffset();
- //append indexing fields
- appendIndexingData(tb, inReader);
- addTupleToFrame(writer);
- record = inReader.readNext();
- }
- appender.flush(writer, true);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- protected abstract void appendIndexingData(ArrayTupleBuilder tb,
- AbstractHDFSReader inReader) throws Exception;
-
- protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException("Record is too big to fit in a frame");
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
deleted file mode 100644
index c94be6a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.indexing.input.AbstractHDFSLookupInputStream;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.runtime.operators.file.IDataParser;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriter;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-/**
- * class implementation for IControlledTupleParser. It provides common
- * functionality involved in parsing data in an external text format (adm or delimited text) in a pipelined manner and packing
- * frames with formed tuples.
- */
-public class AdmOrDelimitedControlledTupleParser implements IControlledTupleParser {
-
- private ArrayTupleBuilder tb;
- private transient DataOutput dos;
- private final FrameTupleAppender appender;
- protected final ARecordType recType;
- private IDataParser parser;
- private boolean propagateInput;
- private int[] propagatedFields;
- private int[] ridFields;
- private RecordDescriptor inRecDesc;
- private FrameTupleAccessor tupleAccessor;
- private FrameTupleReference frameTuple;
- private ByteBufferInputStream bbis;
- private DataInputStream dis;
- private AbstractHDFSLookupInputStream in;
- private boolean parserInitialized = false;
- private boolean retainNull;
- protected byte nullByte;
- protected ArrayTupleBuilder nullTupleBuild;
-
- public AdmOrDelimitedControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType,
- AbstractHDFSLookupInputStream in, boolean propagateInput, RecordDescriptor inRecDesc, IDataParser parser,
- int[] propagatedFields, int[] ridFields, boolean retainNull, INullWriterFactory iNullWriterFactory)
- throws HyracksDataException {
- this.recType = recType;
- this.in = in;
- this.propagateInput = propagateInput;
- this.retainNull = retainNull;
- this.inRecDesc = inRecDesc;
- this.propagatedFields = propagatedFields;
- this.ridFields = ridFields;
- this.parser = parser;
- this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
- appender = new FrameTupleAppender(new VSizeFrame(ctx));
- if (propagateInput) {
- tb = new ArrayTupleBuilder(propagatedFields.length + 1);
- } else {
- tb = new ArrayTupleBuilder(1);
- }
- frameTuple = new FrameTupleReference();
- dos = tb.getDataOutput();
- bbis = new ByteBufferInputStream();
- dis = new DataInputStream(bbis);
- nullByte = ATypeTag.NULL.serialize();
- if (retainNull) {
- INullWriter nullWriter = iNullWriterFactory.createNullWriter();
- nullTupleBuild = new ArrayTupleBuilder(1);
- DataOutput out = nullTupleBuild.getDataOutput();
- try {
- nullWriter.writeNull(out);
- } catch (IOException e) {
- e.printStackTrace();
- }
- } else {
- nullTupleBuild = null;
- }
- }
-
- @Override
- public void close(IFrameWriter writer) throws Exception {
- try {
- in.close();
- appender.flush(writer, true);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void parseNext(IFrameWriter writer, ByteBuffer frameBuffer) throws HyracksDataException {
- try {
- int tupleCount = 0;
- int tupleIndex = 0;
- tupleAccessor.reset(frameBuffer);
- tupleCount = tupleAccessor.getTupleCount();
- int fieldSlotsLength = tupleAccessor.getFieldSlotsLength();
- // Loop over tuples
- while (tupleIndex < tupleCount) {
- boolean found = false;
- int tupleStartOffset = tupleAccessor.getTupleStartOffset(tupleIndex) + fieldSlotsLength;
- int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(tupleIndex,
- ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]);
- // Check if null <- for outer join ->
- if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) {
- } else {
- // Get file number
- bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
- int fileNumber = ((AInt32) inRecDesc
- .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
- .getIntegerValue();
- // Get record offset
- bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
- ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
- long recordOffset = ((AInt64) inRecDesc
- .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
- .getLongValue();
- found = in.fetchRecord(fileNumber, recordOffset);
- }
- if (found) {
- // Since we now know the inputStream is ready, we can safely initialize the parser
- // We can't do that earlier since the parser will start pulling from the stream and if it is not ready,
- // The parser will automatically release its resources
- if (!parserInitialized) {
- parser.initialize(in, recType, true);
- parserInitialized = true;
- }
- tb.reset();
- if (propagateInput) {
- frameTuple.reset(tupleAccessor, tupleIndex);
- for (int i = 0; i < propagatedFields.length; i++) {
- dos.write(frameTuple.getFieldData(propagatedFields[i]),
- frameTuple.getFieldStart(propagatedFields[i]),
- frameTuple.getFieldLength(propagatedFields[i]));
- tb.addFieldEndOffset();
- }
- }
- parser.parse(tb.getDataOutput());
- tb.addFieldEndOffset();
- addTupleToFrame(writer);
- } else if (propagateInput && retainNull) {
- tb.reset();
- frameTuple.reset(tupleAccessor, tupleIndex);
- for (int i = 0; i < propagatedFields.length; i++) {
- dos.write(frameTuple.getFieldData(propagatedFields[i]),
- frameTuple.getFieldStart(propagatedFields[i]),
- frameTuple.getFieldLength(propagatedFields[i]));
- tb.addFieldEndOffset();
- }
- dos.write(nullTupleBuild.getByteArray());
- tb.addFieldEndOffset();
- addTupleToFrame(writer);
- }
- tupleIndex++;
- }
- } catch (Exception e) {
- // un expected error, we try to close the inputstream and throw an exception
- try {
- in.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- throw new HyracksDataException(e);
- }
- }
-
- // For debugging
- public void prettyPrint(FrameTupleAccessor tupleAccessor, RecordDescriptor recDesc) {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- int tc = tupleAccessor.getTupleCount();
- System.err.println("TC: " + tc);
- for (int i = 0; i < tc; ++i) {
- System.err.print(
- i + ":(" + tupleAccessor.getTupleStartOffset(i) + ", " + tupleAccessor.getTupleEndOffset(i) + ")[");
- for (int j = 0; j < tupleAccessor.getFieldCount(); ++j) {
- System.err.print(j + ":(" + tupleAccessor.getFieldStartOffset(i, j) + ", "
- + tupleAccessor.getFieldEndOffset(i, j) + ") ");
- System.err.print("{");
- bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i)
- + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j));
- try {
- byte tag = dis.readByte();
- if (tag == nullByte) {
- System.err.print("NULL");
- } else {
- bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i)
- + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j));
- System.err.print(recDesc.getFields()[j].deserialize(dis));
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- System.err.print("}");
- }
- System.err.println("]");
- }
- }
-
- protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java
deleted file mode 100644
index 6abcbb8..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.InputStream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.runtime.operators.file.IDataParser;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-public class AdmOrDelimitedIndexingTupleParser implements ITupleParser {
-
- private ArrayTupleBuilder tb;
- private final FrameTupleAppender appender;
- private final ARecordType recType;
- private final IDataParser parser;
- private final AMutableInt32 aMutableInt = new AMutableInt32(0);
- private final AMutableInt64 aMutableLong = new AMutableInt64(0);
-
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
-
- public AdmOrDelimitedIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IDataParser parser)
- throws HyracksDataException {
- this.parser = parser;
- this.recType = recType;
- appender = new FrameTupleAppender(new VSizeFrame(ctx));
- tb = new ArrayTupleBuilder(3);
- }
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- // Cast the input stream to a record reader
- AbstractHDFSReader inReader = (AbstractHDFSReader) in;
- try {
- parser.initialize(in, recType, true);
- while (true) {
- tb.reset();
- if (!parser.parse(tb.getDataOutput())) {
- break;
- }
- tb.addFieldEndOffset();
- appendIndexingData(tb, inReader);
- addTupleToFrame(writer);
- }
- appender.flush(writer, true);
- } catch (AsterixException ae) {
- throw new HyracksDataException(ae);
- } catch (Exception ioe) {
- throw new HyracksDataException(ioe);
- }
- }
-
- // This function is used to append RID to Hyracks tuple
- @SuppressWarnings("unchecked")
- private void appendIndexingData(ArrayTupleBuilder tb, AbstractHDFSReader inReader) throws Exception {
- aMutableInt.setValue(inReader.getFileNumber());
- aMutableLong.setValue(inReader.getReaderPosition());
- tb.addField(intSerde, aMutableInt);
- tb.addField(longSerde, aMutableLong);
- }
-
- private void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException("Record is too big to fit in a frame");
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
deleted file mode 100644
index 9271ebe..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.FilesIndexDescription;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AMutableDateTime;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-@SuppressWarnings("unchecked")
-public class FileIndexTupleTranslator {
- private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
- filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount());
- private RecordBuilder recordBuilder = new RecordBuilder();
- private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableString aString = new AMutableString(null);
- private AMutableDateTime aDateTime = new AMutableDateTime(0);
- private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ASTRING);
- private ISerializerDeserializer<ADateTime> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADATETIME);
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- private ArrayTupleReference tuple = new ArrayTupleReference();
-
- public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException {
- tupleBuilder.reset();
- //File Number
- aInt32.setValue(file.getFileNumber());
- filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32,
- tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
-
- //File Record
- recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE);
- // write field 0 (File Name)
- fieldValue.reset();
- aString.setValue(file.getFileName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(0, fieldValue);
-
- //write field 1 (File Size)
- fieldValue.reset();
- aInt64.setValue(file.getSize());
- longSerde.serialize(aInt64, fieldValue.getDataOutput());
- recordBuilder.addField(1, fieldValue);
-
- //write field 2 (File Mod Date)
- fieldValue.reset();
- aDateTime.setValue(file.getLastModefiedTime().getTime());
- dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
- recordBuilder.addField(2, fieldValue);
-
- //write the record
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
- tupleBuilder.addFieldEndOffset();
- tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
- return tuple;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
deleted file mode 100644
index b38b835..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * This is the parser factory for parsers used to do indexing
- */
-public class HDFSIndexingParserFactory implements ITupleParserFactory {
-
- private static final long serialVersionUID = 1L;
- // file input-format <text, seq, rc>
- private final String inputFormat;
- // content format <adm, delimited-text, binary>
- private final String format;
- // delimiter in case of delimited text
- private final char delimiter;
- // quote in case of delimited text
- private final char quote;
- // parser class name in case of binary format
- private final String parserClassName;
- // the expected data type
- private final ARecordType atype;
- // the hadoop job conf
- private transient JobConf jobConf;
- // adapter arguments
- private Map<String, String> arguments;
-
- public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter, char quote,
- String parserClassName) {
- this.inputFormat = inputFormat;
- this.format = format;
- this.parserClassName = parserClassName;
- this.delimiter = delimiter;
- this.quote = quote;
- this.atype = atype;
- }
-
- @Override
- public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException {
- if (format == null) {
- throw new IllegalArgumentException("Unspecified data format");
- }
- if (inputFormat == null) {
- throw new IllegalArgumentException("Unspecified data format");
- }
- if (!inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_RC)
- && !inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_TEXT)
- && !inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE)) {
- throw new IllegalArgumentException("External Indexing not supportd for format " + inputFormat);
- }
- // Do some real work here
- /*
- * Choices are:
- * 1. TxtOrSeq (Object) indexing tuple parser
- * 2. RC indexing tuple parser
- * 3. textual data tuple parser
- */
- if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_ADM)) {
- // choice 3 with adm data parser
- ADMDataParser dataParser = new ADMDataParser();
- return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
- } else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
- // choice 3 with delimited data parser
- DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype, delimiter, quote);
- return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
- }
-
- // binary data with a special parser --> create the parser
- IAsterixHDFSRecordParser objectParser;
- if (parserClassName.equalsIgnoreCase(HDFSAdapterFactory.PARSER_HIVE)) {
- objectParser = new HiveObjectParser();
- } else {
- try {
- objectParser = (IAsterixHDFSRecordParser) Class.forName(parserClassName).newInstance();
- } catch (Exception e) {
- throw new HyracksDataException("Unable to create object parser", e);
- }
- }
- try {
- objectParser.initialize(atype, arguments, jobConf);
- } catch (Exception e) {
- throw new HyracksDataException("Unable to initialize object parser", e);
- }
-
- if (inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
- // Case 2
- return new RCFileIndexingTupleParser(ctx, atype, objectParser);
- } else {
- // Case 1
- return new TextOrSeqIndexingTupleParser(ctx, atype, objectParser);
- }
- }
-
- public JobConf getJobConf() {
- return jobConf;
- }
-
- public void setJobConf(JobConf jobConf) {
- this.jobConf = jobConf;
- }
-
- public Map<String, String> getArguments() {
- return arguments;
- }
-
- public void setArguments(Map<String, String> arguments) {
- this.arguments = arguments;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
deleted file mode 100644
index d9ce7aa..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.input.RCFileLookupReader;
-import org.apache.asterix.external.indexing.input.SequenceFileLookupInputStream;
-import org.apache.asterix.external.indexing.input.SequenceFileLookupReader;
-import org.apache.asterix.external.indexing.input.TextFileLookupInputStream;
-import org.apache.asterix.external.indexing.input.TextFileLookupReader;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.ADMDataParser;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class HDFSLookupAdapter implements IControlledAdapter, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private RecordDescriptor inRecDesc;
- private boolean propagateInput;
- private int[] ridFields;
- private int[] propagatedFields;
- private IAType atype;
- private Map<String, String> configuration;
- private IHyracksTaskContext ctx;
- private IControlledTupleParser parser;
- private ExternalFileIndexAccessor fileIndexAccessor;
- private boolean retainNull;
-
- public HDFSLookupAdapter(IAType atype, RecordDescriptor inRecDesc, Map<String, String> adapterConfiguration,
- boolean propagateInput, int[] ridFields, int[] propagatedFields, IHyracksTaskContext ctx,
- ExternalFileIndexAccessor fileIndexAccessor, boolean retainNull) {
- this.configuration = adapterConfiguration;
- this.atype = atype;
- this.ctx = ctx;
- this.inRecDesc = inRecDesc;
- this.propagatedFields = propagatedFields;
- this.propagateInput = propagateInput;
- this.propagatedFields = propagatedFields;
- this.fileIndexAccessor = fileIndexAccessor;
- this.ridFields = ridFields;
- this.retainNull = retainNull;
- }
-
- /*
- * This function is not easy to read and could be refactored into a better structure but for now it works
- */
- @Override
- public void initialize(IHyracksTaskContext ctx, INullWriterFactory iNullWriterFactory) throws Exception {
- JobConf jobConf = HDFSAdapterFactory.configureJobConf(configuration);
- // Create the lookup reader and the controlled parser
- if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
- configureRCFile(jobConf, iNullWriterFactory);
- } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
- .equals(AsterixTupleParserFactory.FORMAT_ADM)) {
- // create an adm parser
- ADMDataParser dataParser = new ADMDataParser();
- if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
- // Text input format
- TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf);
- parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
- inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
- } else {
- // Sequence input format
- SequenceFileLookupInputStream in = new SequenceFileLookupInputStream(fileIndexAccessor, jobConf);
- parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
- inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
- }
- } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
- .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
- // create a delimited text parser
- char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
- char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
-
- DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
- delimiter, quote);
- if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
- // Text input format
- TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf);
- parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
- inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
- } else {
- // Sequence input format
- SequenceFileLookupInputStream in = new SequenceFileLookupInputStream(fileIndexAccessor, jobConf);
- parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
- inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
- }
- } else {
- configureGenericSeqOrText(jobConf, iNullWriterFactory);
- }
- }
-
- private void configureGenericSeqOrText(JobConf jobConf, INullWriterFactory iNullWriterFactory) throws IOException {
- if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
- // Text input format
- TextFileLookupReader reader = new TextFileLookupReader(fileIndexAccessor, jobConf);
- parser = new SeqOrTxtControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput,
- propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory);
- } else {
- // Sequence input format
- SequenceFileLookupReader reader = new SequenceFileLookupReader(fileIndexAccessor, jobConf);
- parser = new SeqOrTxtControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput,
- propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer, IFrameWriter writer) throws Exception {
- parser.parseNext(writer, buffer);
- }
-
- @Override
- public void close(IFrameWriter writer) throws Exception {
- parser.close(writer);
- }
-
- @Override
- public void fail() throws Exception {
- // Do nothing
- }
-
- private void configureRCFile(Configuration jobConf, INullWriterFactory iNullWriterFactory)
- throws IOException, Exception {
- // RCFileLookupReader
- RCFileLookupReader reader = new RCFileLookupReader(fileIndexAccessor,
- HDFSAdapterFactory.configureJobConf(configuration));
- parser = new RCFileControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput,
- propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory);
- }
-
- private IAsterixHDFSRecordParser createRecordParser(Configuration jobConf) throws HyracksDataException {
- // Create the record parser
- // binary data with a special parser --> create the parser
- IAsterixHDFSRecordParser objectParser;
- if (configuration.get(HDFSAdapterFactory.KEY_PARSER).equals(HDFSAdapterFactory.PARSER_HIVE)) {
- objectParser = new HiveObjectParser();
- } else {
- try {
- objectParser = (IAsterixHDFSRecordParser) Class
- .forName(configuration.get(HDFSAdapterFactory.KEY_PARSER)).newInstance();
- } catch (Exception e) {
- throw new HyracksDataException("Unable to create object parser", e);
- }
- }
- // initialize the parser
- try {
- objectParser.initialize((ARecordType) atype, configuration, jobConf);
- } catch (Exception e) {
- throw new HyracksDataException("Unable to initialize object parser", e);
- }
-
- return objectParser;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
deleted file mode 100644
index fab507d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-// This class takes care of creating the adapter based on the formats and input format
-public class HDFSLookupAdapterFactory implements IControlledAdapterFactory {
-
- private static final long serialVersionUID = 1L;
-
- private Map<String, String> adapterConfiguration;
- private IAType atype;
- private boolean propagateInput;
- private int[] ridFields;
- private int[] propagatedFields;
- private boolean retainNull;
-
- @Override
- public void configure(IAType atype, boolean propagateInput, int[] ridFields,
- Map<String, String> adapterConfiguration, boolean retainNull) {
- this.adapterConfiguration = adapterConfiguration;
- this.atype = atype;
- this.propagateInput = propagateInput;
- this.ridFields = ridFields;
- this.retainNull = retainNull;
- }
-
- @Override
- public IControlledAdapter createAdapter(IHyracksTaskContext ctx, ExternalFileIndexAccessor fileIndexAccessor,
- RecordDescriptor inRecDesc) {
- if (propagateInput) {
- configurePropagatedFields(inRecDesc);
- }
- return new HDFSLookupAdapter(atype, inRecDesc, adapterConfiguration, propagateInput, ridFields,
- propagatedFields, ctx, fileIndexAccessor, retainNull);
- }
-
- private void configurePropagatedFields(RecordDescriptor inRecDesc) {
- int ptr = 0;
- boolean skip = false;
- propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length];
- for (int i = 0; i < inRecDesc.getFieldCount(); i++) {
- if (ptr < ridFields.length) {
- skip = false;
- for (int j = 0; j < ridFields.length; j++) {
- if (ridFields[j] == i) {
- ptr++;
- skip = true;
- break;
- }
- }
- if (!skip)
- propagatedFields[i - ptr] = i;
- } else {
- propagatedFields[i - ptr] = i;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java
deleted file mode 100644
index f42a6d1..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.io.InputStream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.indexing.input.AbstractHDFSReader;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-/*
- * This class is used with hdfs objects instead of hdfs
- */
-public class HDFSObjectTupleParser implements ITupleParser{
-
- private ArrayTupleBuilder tb;
- private final FrameTupleAppender appender;
- private IAsterixHDFSRecordParser deserializer;
-
- public HDFSObjectTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser deserializer)
- throws HyracksDataException {
- appender = new FrameTupleAppender(new VSizeFrame(ctx));
- this.deserializer = deserializer;
- tb = new ArrayTupleBuilder(1);
- }
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- AbstractHDFSReader reader = (AbstractHDFSReader) in;
- Object object;
- try {
- reader.initialize();
- object = reader.readNext();
- while (object!= null) {
- tb.reset();
- deserializer.parse(object, tb.getDataOutput());
- tb.addFieldEndOffset();
- addTupleToFrame(writer);
- object = reader.readNext();
- }
- appender.flush(writer, true);
- } catch (AsterixException ae) {
- throw new HyracksDataException(ae);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java
deleted file mode 100644
index ac3a92f..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.indexing.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class HDFSObjectTupleParserFactory implements ITupleParserFactory{
- private static final long serialVersionUID = 1L;
- // parser class name in case of binary format
- private String parserClassName;
- // the expected data type
- private ARecordType atype;
- // the hadoop job conf
- private HDFSAdapterFactory adapterFactory;
- // adapter arguments
- private Map<String,String> arguments;
-
- public HDFSObjectTupleParserFactory(ARecordType atype, HDFSAdapterFactory adapterFactory, Map<String,String> arguments){
- this.parserClassName = (String) arguments.get(HDFSAdapterFactory.KEY_PARSER);
- this.atype = atype;
- this.arguments = arguments;
- this.adapterFactory = adapterFactory;
- }
-
- @Override
- public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException {
- IAsterixHDFSRecordParser objectParser;
- if (parserClassName.equals(HDFSAdapterFactory.PARSER_HIVE)) {
- objectParser = new HiveObjectParser();
- } else {
- try {
- objectParser = (IAsterixHDFSRecordParser) Class.forName(parserClassName).newInstance();
- } catch (Exception e) {
- throw new HyracksDataException("Unable to create object parser", e);
- }
- }
- try {
- objectParser.initialize(atype, arguments, adapterFactory.getJobConf());
- } catch (Exception e) {
- throw new HyracksDataException("Unable to initialize object parser", e);
- }
-
- return new HDFSObjectTupleParser(ctx, atype, objectParser);
- }
-
-}