You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2012/10/22 20:10:58 UTC
svn commit: r1400977 - in
/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata:
VXQueryCollectionOperatorDescriptor.java VXQueryMetadataProvider.java
Author: prestonc
Date: Mon Oct 22 18:10:58 2012
New Revision: 1400977
URL: http://svn.apache.org/viewvc?rev=1400977&view=rev
Log:
Added the link for the runtime to get the files from collection into frames. Now it gets an activity graph null pointer exception.
Added:
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java (with props)
Modified:
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
Added: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java?rev=1400977&view=auto
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java (added)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java Mon Oct 22 18:10:58 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.metadata;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.vxquery.runtime.functions.util.FunctionHelper;
+import org.xml.sax.InputSource;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private String collectionName;
+
+ public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, String collectionName, RecordDescriptor rDesc) {
+ super(spec, 0, 1);
+ this.collectionName = collectionName;
+ recordDescriptors[0] = rDesc;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ final ByteBuffer frame = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ final InputSource in = new InputSource();
+ final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ appender.reset(frame, true);
+ writer.open();
+ try {
+ File collectionDirectory = new File(collectionName);
+ File[] list = collectionDirectory.listFiles();
+
+ for (int i = 0; i < list.length; ++i) {
+ // Add the document node to the frame output.
+ if (list[i].getPath().endsWith(".xml")) {
+ abvsFileNode.reset();
+ FunctionHelper.readInDocFromString(list[i].getPath(), in, abvsFileNode);
+
+ tb.reset();
+ tb.addField(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength());
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException(
+ "Could not write frame (VXQueryCollectionOperatorDescriptor.createPushRuntime).");
+ }
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java?rev=1400977&r1=1400976&r2=1400977&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java Mon Oct 22 18:10:58 2012
@@ -53,7 +53,14 @@ public class VXQueryMetadataProvider imp
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
throws AlgebricksException {
- return null;
+ VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource;
+ RecordDescriptor rDesc = new RecordDescriptor(null);
+ IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds.getId(), rDesc);
+
+ String[] locations = new String[1];
+ locations[0] = ds.getId();
+ AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint);
}
@Override