You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2015/11/05 03:55:01 UTC
[15/17] incubator-hawq git commit: HAWQ-100. Code Cleanup:
gpmapreduce.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4e392375/src/bin/gpmapreduce/src/parse.c
----------------------------------------------------------------------
diff --git a/src/bin/gpmapreduce/src/parse.c b/src/bin/gpmapreduce/src/parse.c
deleted file mode 100644
index b7f6c44..0000000
--- a/src/bin/gpmapreduce/src/parse.c
+++ /dev/null
@@ -1,2147 +0,0 @@
-#include <parser.h>
-#include <except.h>
-#include <mapred_errors.h>
-
-#include <stdio.h>
-#include <yaml_parse.h>
-#include <yaml.h>
-
-#include <stdarg.h>
-
-int mapred_verify_object(mapred_parser_t *parser, mapred_object_t *obj);
-
-/* -------------------------------------------------------------------------- */
-int mapred_parse_error(mapred_parser_t *parser, char *fmt, ...)
-{
- mapred_object_t *obj = parser->current_obj;
- va_list arg;
-
- if (parser && parser->current_doc)
- {
- if (global_verbose_flag)
- fprintf(stderr, " - ");
- parser->current_doc->u.document.flags |= mapred_document_error;
- }
- else if (global_verbose_flag)
- fprintf(stderr, " - ");
-
- fprintf(stderr, "Error: ");
- if (obj && obj->name)
- fprintf(stderr, "%s '%s': ", mapred_kind_name[obj->kind], obj->name);
- if (obj && !obj->name)
- fprintf(stderr, "%s: ", mapred_kind_name[obj->kind]);
-
- va_start(arg, fmt);
- vfprintf(stderr, fmt, arg);
- va_end(arg);
- if (parser && parser->event.start_mark.line)
- fprintf(stderr, ", at line %d", (int) parser->event.start_mark.line+1);
- fprintf(stderr, "\n");
-
- return MAPRED_PARSE_ERROR;
-}
-
-
-#define copyscalar(s) \
- strcpy(malloc(strlen(s)+1), s)
-
-mapred_olist_t* mapred_parse_string(unsigned char *yaml)
-{
- mapred_olist_t *documents;
- yaml_parser_t parser;
-
- XASSERT(yaml);
- if (!yaml_parser_initialize(&parser))
- XRAISE(MAPRED_PARSE_INTERNAL,
- "YAML parser initialization failed");
-
- yaml_parser_set_input_string(&parser, yaml, strlen((char*) yaml));
- documents = mapred_parse_yaml(&parser);
- yaml_parser_delete(&parser);
- return documents;
-}
-
-mapred_olist_t* mapred_parse_file(FILE *file)
-{
- mapred_olist_t *documents;
- yaml_parser_t parser;
-
- XASSERT(file);
- if (!yaml_parser_initialize(&parser))
- XRAISE(MAPRED_PARSE_INTERNAL,
- "YAML parser initialization failed");
-
- yaml_parser_set_input_file(&parser, file);
- documents = mapred_parse_yaml(&parser);
- yaml_parser_delete(&parser);
- return documents;
-}
-
-mapred_olist_t* mapred_parse_yaml(yaml_parser_t *yparser)
-{
- mapred_parser_t parser;
- int i;
- int error = 0;
- mapred_olist_t *doc_item;
-
- /* Give us a clean slate */
- memset(&parser, 0, sizeof(parser));
-
- /* Initialize what must be initialized */
-#if USE_FLEX_REENTRANT
- yaml_scalar_yylex_init (&parser.yscanner);
-#endif
- parser.yparser = yparser;
- parser.state = STATE_YAML_PARSE;
- parser.frame = -1;
- for (i = 0; i < MAX_CONTEXT_DEPTH; i++)
- parser.context[i] = CONTEXT_NONE;
-
- /* Call into the parser, detects grammar errors */
- error = yaml_yyparse(&parser);
-
- /* finalize final document */
- parser_add_document(&parser);
-
- /* Cleanup and return */
-#if USE_FLEX_REENTRANT
- yaml_scalar_yylex_destroy(parser.yscanner);
-#endif
-
- /* Check for errors within documents */
- for (doc_item = parser.doclist;
- doc_item && !error;
- doc_item = doc_item->next)
- {
- if (doc_item->object->u.document.flags & mapred_document_error)
- error = true;
- }
-
- /* Cleanup and return */
- if (error)
- {
- mapred_destroy_olist(&parser.doclist);
- XRAISE(MAPRED_PARSE_ERROR, "parse failure");
- }
-
- return parser.doclist;
-}
-
-
-void parser_add_document(mapred_parser_t *parser)
-{
- mapred_olist_t *newitem;
- mapred_olist_t *doclist;
- int error;
-
- if (!parser->current_doc)
- return;
-
- /* Add the last of the documents objects into the document */
- parser_add_object(parser, MAPRED_NO_KIND);
-
- /* Verify the completed document */
- error = mapred_verify_object(parser, parser->current_doc);
- if (error != NO_ERROR)
- parser->current_doc->u.document.flags |= mapred_document_error;
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_olist_t));
- newitem->object = parser->current_doc;
- newitem->next = (mapred_olist_t *) NULL;
-
- /* Insert it into the last slot of the existing list */
- doclist = parser->doclist;
- while (doclist && doclist->next)
- doclist = doclist->next;
- if (doclist)
- doclist->next = newitem;
- else
- parser->doclist = newitem;
-}
-
-void parser_begin_document(mapred_parser_t *parser)
-{
- /* If there is a current document add it first */
- parser_add_document(parser);
-
- /* Allocate an object for the new document and return */
- parser->current_doc = malloc(sizeof(mapred_object_t));
- memset(parser->current_doc, 0, sizeof(mapred_object_t));
- parser->current_doc->kind = MAPRED_DOCUMENT;
- parser->current_doc->u.document.id = ++parser->doc_number;
- parser->current_doc->line = (int) parser->event.start_mark.line+1;
-
- if (global_verbose_flag)
- fprintf(stderr, " - Parsing YAML Document %d:\n", parser->doc_number);
-}
-
-void parser_begin_define(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
-
- /*
- * The only thing we have to do is ensure that this isn't a duplicate
- * define list.
- */
- if (parser->current_doc->u.document.flags & mapred_document_defines)
- {
- mapred_parse_error(parser, "Duplicate DEFINE list in DOCUMENT");
- return;
- }
-
- parser->current_doc->u.document.flags |= mapred_document_defines;
-}
-
-void parser_begin_execute(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
-
- /*
- * The only thing we have to do is ensure that this isn't a duplicate
- * execution list.
- */
- if (parser->current_doc->u.document.flags & mapred_document_executes)
- {
- mapred_parse_error(parser, "Duplicate EXECUTE list in DOCUMENT");
- return;
- }
-
- parser->current_doc->u.document.flags |= mapred_document_executes;
-}
-
-void parser_set_version(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
-
- if (parser->current_doc->u.document.version)
- {
- mapred_parse_error(parser, "Duplicate Version: %s", value);
- return;
- }
-
- /*
- * We have already assured that the value matches a good regex,
- * but we must still validate that the version itself is supported.
- */
- if (strcmp(value, "1.0.0.1") < 0 || strcmp(value, "1.0.0.3") > 0)
- {
- mapred_parse_error(parser, "Unrecognized VERSION");
- }
-
- parser->current_doc->u.document.version = copyscalar(value);
-}
-
-void parser_set_database(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- if (parser->current_doc->u.document.database)
- {
- mapred_parse_error(parser, "Duplicate Database: %s", value);
- return;
- }
- parser->current_doc->u.document.database = copyscalar(value);
-}
-
-void parser_set_user(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- if (parser->current_doc->u.document.user)
- {
- mapred_parse_error(parser, "Duplicate User: %s", value);
- return;
- }
- parser->current_doc->u.document.user = copyscalar(value);
-}
-
-void parser_set_host(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- if (parser->current_doc->u.document.host)
- {
- mapred_parse_error(parser, "Duplicate Host: %s", value);
- return;
- }
- parser->current_doc->u.document.host = copyscalar(value);
-}
-
-void parser_set_port(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- if (parser->current_doc->u.document.port > 0)
- {
- mapred_parse_error(parser, "Duplicate Port: %s", value);
- return;
- }
-
- /*
- * The parse has already assured that the value consists of a sequence
- * of digits, so strtol should convert successfully.
- */
- parser->current_doc->u.document.port = (int) strtol(value, NULL, 10);
-}
-
-
-/*
- * parser_add_object - Create a new empty object for the current document.
- */
-void parser_add_object(mapred_parser_t *parser, mapred_kind_t kind)
-{
- int error;
-
- XASSERT(parser->current_doc);
-
- /*
- * If we have a current object then verify it and add it into the
- * document's object list.
- */
- if (parser->current_obj)
- {
- mapred_olist_t *newitem;
- mapred_olist_t *objlist;
-
- /* Validate the finished object */
- error = mapred_verify_object(parser, parser->current_obj);
- if (error != NO_ERROR)
- {
- mapred_destroy_object(&parser->current_obj);
- parser->current_doc->u.document.flags |=
- mapred_document_error;
- }
- else
- {
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_olist_t));
- newitem->object = parser->current_obj;
- newitem->next = (mapred_olist_t *) NULL;
-
- /* Insert it into the last slot of the existing list */
- objlist = parser->current_doc->u.document.objects;
- while (objlist && objlist->next)
- objlist = objlist->next;
- if (objlist)
- objlist->next = newitem;
- else
- parser->current_doc->u.document.objects = newitem;
-
- if (global_verbose_flag)
- {
- const char *type, *name;
- XASSERT (newitem->object->kind > 0 &&
- newitem->object->kind <= MAPRED_MAXKIND);
-
-
- type = mapred_kind_name[newitem->object->kind];
- name = newitem->object->name;
- if (name)
- fprintf(stderr, " - %s: %s\n", type, name);
- else
- fprintf(stderr, " - %s\n", type);
- }
- }
- }
-
- /*
- * If 'kind' is 'NO_KIND' then we just add in the current object
- * (above) and do not create a new one. We call it this way once
- * at the end to add the last object into the current document.
- */
- if (kind == MAPRED_NO_KIND)
- {
- parser->current_obj = (mapred_object_t *) NULL;
- return;
- }
-
- /* Allocate a new empyt object of the correct kind and return. */
- parser->current_obj = malloc(sizeof(mapred_object_t));
- memset(parser->current_obj, 0, sizeof(mapred_object_t));
- parser->current_obj->kind = kind;
- parser->current_obj->line = (int) parser->event.start_mark.line+1;
-}
-
-
-
-void parser_add_run(mapred_parser_t *parser)
-{
- /*
- * Execution objects just re-use the 'task' structure. The only
- * differences are that:
- * Execution objects get RUN
- * Execution objects do not require (or support) a NAME
- */
- parser_add_object(parser, MAPRED_EXECUTION);
- parser->current_obj->u.task.execute = true;
-}
-
-
-
-void parser_set_name(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT ||
- parser->current_obj->kind == MAPRED_OUTPUT ||
- parser->current_obj->kind == MAPRED_MAPPER ||
- parser->current_obj->kind == MAPRED_TRANSITION ||
- parser->current_obj->kind == MAPRED_COMBINER ||
- parser->current_obj->kind == MAPRED_FINALIZER ||
- parser->current_obj->kind == MAPRED_REDUCER ||
- parser->current_obj->kind == MAPRED_TASK);
-
- /* If this is an invalid name => throw an error */
- if (!value || strlen(value) == 0)
- {
- value = "?";
- mapred_parse_error(parser, "Invalid NAME", value);
- }
-
- /* If the object already has a name => throw an error */
- if (parser->current_obj->name)
- {
- mapred_parse_error(parser, "Duplicate NAME", value);
- return;
- }
-
- parser->current_obj->name = copyscalar(value);
-}
-
-void parser_set_table(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT ||
- parser->current_obj->kind == MAPRED_OUTPUT);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid TABLE");
- }
-
- if (parser->current_obj->kind == MAPRED_INPUT)
- {
- if (!value || strlen(value) == 0)
- {
- if (parser->current_obj->u.input.type == MAPRED_INPUT_NONE)
- parser->current_obj->u.input.type = MAPRED_INPUT_TABLE;
- mapred_parse_error(parser, "Invalid TABLE");
- return;
- }
-
- if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
- {
- switch (parser->current_obj->u.input.type)
- {
- case MAPRED_INPUT_TABLE:
- mapred_parse_error(parser,
- "Duplicate TABLE");
- return;
- case MAPRED_INPUT_FILE:
- mapred_parse_error(parser,
- "FILE is incompatible with TABLE");
- return;
- case MAPRED_INPUT_GPFDIST:
- mapred_parse_error(parser,
- "GPFDIST is incompatible with TABLE");
- return;
- case MAPRED_INPUT_QUERY:
- mapred_parse_error(parser,
- "QUERY is incompatible with TABLE");
- return;
- case MAPRED_INPUT_EXEC:
- mapred_parse_error(parser,
- "GPFDIST is incompatible with TABLE");
- return;
- default:
- XASSERT(false);
- }
- }
- parser->current_obj->u.input.type = MAPRED_INPUT_TABLE;
- parser->current_obj->u.input.desc = copyscalar(value);
- }
- else
- {
- if (!value || strlen(value) == 0)
- {
- if (parser->current_obj->u.output.type == MAPRED_OUTPUT_NONE)
- parser->current_obj->u.output.type = MAPRED_OUTPUT_TABLE;
- mapred_parse_error(parser, "Invalid TABLE");
- return;
- }
-
- if (parser->current_obj->u.output.type != MAPRED_OUTPUT_NONE)
- {
- switch (parser->current_obj->u.output.type)
- {
- case MAPRED_OUTPUT_TABLE:
- mapred_parse_error(parser,
- "Duplicate TABLE");
- return;
- case MAPRED_OUTPUT_FILE:
- mapred_parse_error(parser,
- "FILE is incompatible with TABLE");
- return;
- default:
- XASSERT(false);
- }
- }
- parser->current_obj->u.output.type = MAPRED_OUTPUT_TABLE;
- parser->current_obj->u.output.desc = copyscalar(value);
- }
-}
-
-void parser_set_query(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid QUERY");
- }
-
- if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
- {
- if (parser->current_obj->u.input.type == MAPRED_INPUT_QUERY)
- {
- mapred_parse_error(parser, "Duplicate QUERY for INPUT");
- return;
- }
- else
- {
- mapred_parse_error(parser, "INPUT may only specify one of "
- "FILE, GPFDIST, TABLE, QUERY, EXEC");
- return;
- }
- }
- parser->current_obj->u.input.type = MAPRED_INPUT_QUERY;
- parser->current_obj->u.input.desc = copyscalar(value);
-}
-
-void parser_set_exec(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid EXEC");
- }
-
- if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
- {
- if (parser->current_obj->u.input.type == MAPRED_INPUT_EXEC)
- {
- mapred_parse_error(parser, "Duplicate EXEC for INPUT");
- return;
- }
- else
- {
- mapred_parse_error(parser, "INPUT may only specify one of "
- "FILE, GPFDIST, TABLE, QUERY, EXEC");
- return;
- }
- }
- parser->current_obj->u.input.type = MAPRED_INPUT_EXEC;
- parser->current_obj->u.input.desc = copyscalar(value);
-}
-
-void parser_set_format(mapred_parser_t *parser, char *value)
-{
- mapred_format_t format;
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
-
- if (value && !strcasecmp(value, "text"))
- format = MAPRED_FORMAT_TEXT;
- else if (value && !strcasecmp(value, "csv"))
- format = MAPRED_FORMAT_CSV;
- else
- format = MAPRED_FORMAT_INVALID;
-
- switch (parser->current_obj->kind)
- {
- case MAPRED_INPUT:
- if (format == MAPRED_FORMAT_INVALID)
- mapred_parse_error(parser, "Duplicate FORMAT");
- if (parser->current_obj->u.input.format != MAPRED_FORMAT_NONE)
- {
- format = MAPRED_FORMAT_INVALID;
- mapred_parse_error(parser, "Duplicate FORMAT");
- }
- parser->current_obj->u.input.format = format;
- return;
-
- case MAPRED_OUTPUT:
- if (format == MAPRED_FORMAT_INVALID)
- mapred_parse_error(parser, "Duplicate FORMAT");
- if (parser->current_obj->u.output.format != MAPRED_FORMAT_NONE)
- {
- format = MAPRED_FORMAT_INVALID;
- mapred_parse_error(parser, "Duplicate FORMAT");
- }
- parser->current_obj->u.output.format = format;
- return;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_set_delimiter(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid DELIMITER");
- }
-
- switch (parser->current_obj->kind)
- {
- case MAPRED_INPUT:
- if (parser->current_obj->u.input.delimiter)
- {
- mapred_parse_error(parser, "Duplicate DELIMITER");
- return;
- }
- parser->current_obj->u.input.delimiter = copyscalar(value);
- return;
-
- case MAPRED_OUTPUT:
- if (parser->current_obj->u.output.delimiter)
- {
- mapred_parse_error(parser, "Duplicate DELIMITER");
- return;
- }
- parser->current_obj->u.output.delimiter = copyscalar(value);
- return;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_set_escape(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid ESCAPE");
- }
- if (parser->current_obj->u.input.escape)
- {
- mapred_parse_error(parser, "Duplicate ESCAPE");
- return;
- }
- parser->current_obj->u.input.escape = copyscalar(value);
-}
-
-
-void parser_set_null(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- parser->current_obj->u.input.null = copyscalar("");
- mapred_parse_error(parser, "Invalid NULL");
- return;
- }
- if (parser->current_obj->u.input.null)
- {
- mapred_parse_error(parser, "Duplicate NULL");
- return;
- }
- parser->current_obj->u.input.null = copyscalar(value);
-}
-
-void parser_set_quote(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- parser->current_obj->u.input.quote = copyscalar("");
- mapred_parse_error(parser, "Invalid QUOTE");
- return;
- }
- if (parser->current_obj->u.input.quote)
- {
- mapred_parse_error(parser, "Duplicate QUOTE");
- return;
- }
- parser->current_obj->u.input.quote = copyscalar(value);
-}
-
-
-void parser_set_encoding(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- parser->current_obj->u.input.encoding = copyscalar("");
- mapred_parse_error(parser, "Invalid ENCODING");
- return;
- }
- if (parser->current_obj->u.input.encoding)
- {
- mapred_parse_error(parser, "Duplicate ENCODING");
- return;
- }
- parser->current_obj->u.input.encoding = copyscalar(value);
-}
-
-void parser_set_error_limit(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (!value || strlen(value) == 0)
- {
- parser->current_obj->u.input.error_limit = -1;
- mapred_parse_error(parser, "Invalid ERROR_LIMIT");
- return;
- }
- if (parser->current_obj->u.input.error_limit > 0)
- {
- mapred_parse_error(parser, "Duplicate ERROR_LIMIT");
- return;
- }
-
- /*
- * The parse has already assured that the value consists of a sequence
- * of digits, so strtol should convert successfully.
- */
- parser->current_obj->u.input.error_limit = (int) strtol(value, NULL, 10);
-}
-
-
-void parser_set_mode(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
-
- switch (parser->current_obj->kind)
- {
- case MAPRED_MAPPER:
- case MAPRED_TRANSITION:
- case MAPRED_COMBINER:
- case MAPRED_FINALIZER:
- {
- mapred_mode_t mode;
-
- /* Convert input string into a valid mode */
- if (value && !strcasecmp(value, "single"))
- mode = MAPRED_MODE_SINGLE;
- else if (value && !strcasecmp(value, "multi"))
- mode = MAPRED_MODE_MULTI;
- else
- mode = MAPRED_MODE_INVALID;
-
- /* Only MAP and FINALIZE support MULTI mode */
- if (mode == MAPRED_MODE_MULTI &&
- parser->current_obj->kind != MAPRED_MAPPER &&
- parser->current_obj->kind != MAPRED_FINALIZER)
- {
- mode = MAPRED_MODE_INVALID;
- }
-
- /* Error for invalid or duplicate modes */
- if (mode == MAPRED_MODE_INVALID)
- {
- mapred_parse_error(parser, "Invalid MODE");
- }
- if (parser->current_obj->u.function.mode != MAPRED_MODE_NONE)
- {
- mode = MAPRED_MODE_INVALID;
- mapred_parse_error(parser, "Duplicate MODE");
- }
-
- /* Set mode and return */
- parser->current_obj->u.function.mode = mode;
- return;
- }
-
- case MAPRED_OUTPUT:
- {
- mapred_output_mode_t mode;
-
- /* Convert input string into a valid mode */
- if (value && !strcasecmp(value, "replace"))
- mode = MAPRED_OUTPUT_MODE_REPLACE;
- else if (value && !strcasecmp(value, "append"))
- mode = MAPRED_OUTPUT_MODE_APPEND;
- else
- mode = MAPRED_OUTPUT_MODE_INVALID;
-
- /* Error for invalid or duplicate modes */
- if (mode == MAPRED_OUTPUT_MODE_INVALID)
- {
- mapred_parse_error(parser, "Invalid MODE");
- }
- if (parser->current_obj->u.output.mode != MAPRED_OUTPUT_MODE_NONE)
- {
- mode = MAPRED_OUTPUT_MODE_INVALID;
- mapred_parse_error(parser, "Duplicate MODE");
- }
-
- /* Set mode and return */
- parser->current_obj->u.output.mode = mode;
- return;
- }
-
- default:
- XASSERT(false); /* ONLY functions and OUTPUTS have modes */
- }
-}
-
-void parser_set_file(mapred_parser_t *parser, char *value)
-{
- /*
- * Only applies to OUTPUTS which have a single file.
- * INPUTS use parser_begin_files, parser_add_file ...
- */
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_OUTPUT);
-
- switch (parser->current_obj->u.output.type)
- {
- case MAPRED_OUTPUT_NONE:
- parser->current_obj->u.output.type = MAPRED_OUTPUT_FILE;
- if (!value || strlen(value) == 0)
- {
- mapred_parse_error(parser, "Invalid FILE");
- return;
- }
- parser->current_obj->u.output.desc = copyscalar(value);
- break;
-
- case MAPRED_OUTPUT_FILE:
- {
- mapred_parse_error(parser, "Duplicate FILE");
- return;
- }
-
- case MAPRED_OUTPUT_TABLE:
- {
- mapred_parse_error(parser, "TABLE is incompatible with FILE");
- return;
- }
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_set_transition(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- if (parser->current_obj->u.reducer.transition.name)
- {
- mapred_parse_error(parser, "Duplicate TRANSITION for REDUCE");
- return;
- }
- parser->current_obj->u.reducer.transition.name = copyscalar(value);
-}
-
-void parser_set_combiner(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- if (parser->current_obj->u.reducer.combiner.name)
- {
- mapred_parse_error(parser, "Duplicate CONSOLIDATE for REDUCE");
- return;
- }
- parser->current_obj->u.reducer.combiner.name = copyscalar(value);
-}
-
-void parser_set_finalizer(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- if (parser->current_obj->u.reducer.finalizer.name)
- {
- mapred_parse_error(parser, "Duplicate FINALIZE for REDUCE");
- return;
- }
- parser->current_obj->u.reducer.finalizer.name = copyscalar(value);
-}
-
-void parser_set_initialize(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- if (parser->current_obj->u.reducer.initialize)
- {
- mapred_parse_error(parser, "Duplicate INITIALIZE for REDUCE");
- return;
- }
- parser->current_obj->u.reducer.initialize = copyscalar(value);
-}
-
-
-void parser_set_language(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
- parser->current_obj->kind == MAPRED_TRANSITION ||
- parser->current_obj->kind == MAPRED_COMBINER ||
- parser->current_obj->kind == MAPRED_FINALIZER);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid LANGUAGE");
- }
- if (parser->current_obj->u.function.language)
- {
- mapred_parse_error(parser, "Duplicate LANGUAGE");
- return;
- }
- parser->current_obj->u.function.language = copyscalar(value);
-}
-
-void parser_set_function(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
- parser->current_obj->kind == MAPRED_TRANSITION ||
- parser->current_obj->kind == MAPRED_COMBINER ||
- parser->current_obj->kind == MAPRED_FINALIZER);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid FUNCTION");
- }
- if (parser->current_obj->u.function.body)
- {
- mapred_parse_error(parser, "Duplicate FUNCTION");
- return;
- }
- parser->current_obj->u.function.body = copyscalar(value);
-
-
- /*
- * The "start_mark" of function body has a line number, but what that line
- * number refers to is a bit finicky depending on the nature of the YAML.
- * So we take it and adjust it accordingly.
- */
- parser->current_obj->u.function.lineno = parser->event.start_mark.line;
- switch (parser->event.data.scalar.style)
- {
- case YAML_LITERAL_SCALAR_STYLE:
- case YAML_FOLDED_SCALAR_STYLE:
- parser->current_obj->u.function.lineno += 2;
- break;
-
- case YAML_PLAIN_SCALAR_STYLE:
- case YAML_SINGLE_QUOTED_SCALAR_STYLE:
- case YAML_DOUBLE_QUOTED_SCALAR_STYLE:
- parser->current_obj->u.function.lineno += 1;
- break;
- default:
- break;
- }
-}
-
-/*
- * parser_set_library was added to support the "LIBRARY" option in mapreduce
- * yaml schema version 1.0.0.2. This is used by C language functions to
- * specify which code library the C function is defined in.
- *
- * - MAP:
- * ...
- * LIBRARY: $libdir/libfoo
- * FUNCTION: myFunc
- */
-void parser_set_library(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
- parser->current_obj->kind == MAPRED_TRANSITION ||
- parser->current_obj->kind == MAPRED_COMBINER ||
- parser->current_obj->kind == MAPRED_FINALIZER);
-
- if (!value || strlen(value) == 0)
- {
- value = "";
- mapred_parse_error(parser, "Invalid LIBRARY");
- }
- if (parser->current_obj->u.function.library)
- {
- mapred_parse_error(parser, "Duplicate LIBRARY");
- return;
- }
- parser->current_obj->u.function.library = copyscalar(value);
-
- /*
- * We will validate that the document version is >= 1.0.0.2
- * durring object verification.
- */
-}
-
-void parser_set_optimize(mapred_parser_t *parser, char *value)
-{
- /* FIXME */
-}
-
-
-void parser_set_source(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
-
- switch (parser->current_obj->kind)
- {
- case MAPRED_TASK:
- if (parser->current_obj->u.task.input.name)
- {
- mapred_parse_error(parser, "Duplicate SOURCE for TASK");
- return;
- }
- parser->current_obj->u.task.input.name = copyscalar(value);
- break;
-
- case MAPRED_EXECUTION:
- if (parser->current_obj->u.task.input.name)
- {
- mapred_parse_error(parser, "Duplicate SOURCE for RUN");
- return;
- }
- parser->current_obj->u.task.input.name = copyscalar(value);
- break;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_set_target(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_EXECUTION);
-
- if (parser->current_obj->u.task.output.name)
- {
- mapred_parse_error(parser, "Duplicate TARGET for RUN");
- return;
- }
- parser->current_obj->u.task.output.name = copyscalar(value);
-}
-
-void parser_set_mapper(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
-
- switch (parser->current_obj->kind)
- {
- case MAPRED_TASK:
- if (parser->current_obj->u.task.mapper.name)
- {
- mapred_parse_error(parser, "Duplicate MAP for TASK");
- return;
- }
- parser->current_obj->u.task.mapper.name = copyscalar(value);
- break;
-
- case MAPRED_EXECUTION:
- if (parser->current_obj->u.task.mapper.name)
- {
- mapred_parse_error(parser, "Duplicate MAP for RUN");
- return;
- }
- parser->current_obj->u.task.mapper.name = copyscalar(value);
- break;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_set_reducer(mapred_parser_t *parser, char *value)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
-
- switch (parser->current_obj->kind)
- {
- case MAPRED_TASK:
- if (parser->current_obj->u.task.reducer.name)
- {
- mapred_parse_error(parser, "Duplicate REDUCE for TASK");
- return;
- }
- parser->current_obj->u.task.reducer.name = copyscalar(value);
- break;
-
- case MAPRED_EXECUTION:
- if (parser->current_obj->u.task.reducer.name)
- {
- mapred_parse_error(parser, "Duplicate REDUCE for RUN");
- return;
- }
- parser->current_obj->u.task.reducer.name = copyscalar(value);
- break;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_begin_ordering(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- /*
- * We will validate that the document version is >= 1.0.0.3
- * durring object verification.
- */
- if (parser->current_obj->u.reducer.ordering)
- {
- mapred_parse_error(parser, "Duplicate ORDERING for REDUCER");
- return;
- }
-}
-
-void parser_add_ordering(mapred_parser_t *parser, char *value)
-{
- mapred_clist_t *newitem;
- mapred_clist_t *clist;
-
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- /*
- * Validate ordering:
- * In general ordering can be an arbitrary expression so it is
- * difficult to verify easily. If we need more verification it
- * makes sense to push that verification into the grammar.
- */
- if (!value || strlen(value) == 0)
- {
- mapred_parse_error(parser, "Invalid ORDERING");
- return;
- }
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_clist_t));
- newitem->value = copyscalar(value);
- newitem->next = (mapred_clist_t *) NULL;
-
- /* Add the new item into the last slot of the list */
- clist = parser->current_obj->u.reducer.ordering;
- if (clist == NULL)
- parser->current_obj->u.reducer.ordering = newitem;
- else
- {
- while (clist && clist->next)
- clist = clist->next;
- clist->next = newitem;
- }
-}
-
-
-/* List functions */
-void parser_begin_files(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
- {
- if (parser->current_obj->u.input.type == MAPRED_INPUT_FILE)
- {
- mapred_parse_error(parser, "Duplicate FILE for INPUT");
- return;
- }
- else
- {
- mapred_parse_error(parser, "INPUT may only specify one of "
- "FILE, GPFDIST, TABLE, QUERY, EXEC");
- return;
- }
- }
-
- /* files will be added individually */
- parser->current_obj->u.input.type = MAPRED_INPUT_FILE;
-}
-
-void parser_begin_gpfdist(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
- {
- if (parser->current_obj->u.input.type == MAPRED_INPUT_GPFDIST)
- {
- mapred_parse_error(parser, "Duplicate GPFDIST for INPUT");
- return;
- }
- else
- {
- mapred_parse_error(parser, "INPUT may only specify one of "
- "FILE, GPFDIST, TABLE, QUERY, EXEC");
- return;
- }
- }
- parser->current_obj->u.input.type = MAPRED_INPUT_GPFDIST;
-}
-
-void parser_begin_columns(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
- if (parser->current_obj->u.input.columns)
- {
- mapred_parse_error(parser, "Duplicate COLUMNS for INPUT");
- return;
- }
-}
-
-void parser_begin_parameters(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- switch (parser->current_obj->kind)
- {
- case MAPRED_MAPPER:
- if (parser->current_obj->u.function.parameters)
- {
- mapred_parse_error(parser, "Duplicate PARAMETERS for MAP");
- return;
- }
- break;
-
- case MAPRED_TRANSITION:
- if (parser->current_obj->u.function.parameters)
- {
- mapred_parse_error(parser, "Duplicate PARAMETERS for TRANSITION");
- return;
- }
- break;
-
- case MAPRED_COMBINER:
- if (parser->current_obj->u.function.parameters)
- {
- mapred_parse_error(parser, "Duplicate PARAMETERS for CONSOLIDATE");
- return;
- }
- break;
-
- case MAPRED_FINALIZER:
- if (parser->current_obj->u.function.parameters)
- {
- mapred_parse_error(parser, "Duplicate PARAMETERS for FINALIZE");
- return;
- }
- break;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_begin_returns(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- switch (parser->current_obj->kind)
- {
- case MAPRED_MAPPER:
- if (parser->current_obj->u.function.returns)
- {
- mapred_parse_error(parser, "Duplicate RETURNS for MAP");
- return;
- }
- break;
-
- case MAPRED_TRANSITION:
- if (parser->current_obj->u.function.returns)
- {
- mapred_parse_error(parser, "Duplicate RETURNS for TRANSITION");
- return;
- }
- break;
-
- case MAPRED_COMBINER:
- if (parser->current_obj->u.function.returns)
- {
- mapred_parse_error(parser, "Duplicate RETURNS for CONSOLIDATE");
- return;
- }
- break;
-
- case MAPRED_FINALIZER:
- if (parser->current_obj->u.function.returns)
- {
- mapred_parse_error(parser, "Duplicate RETURNS for FINALIZE");
- return;
- }
- break;
-
- default:
- XASSERT(false);
- }
-}
-
-void parser_begin_keys(mapred_parser_t *parser)
-{
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
- if (parser->current_obj->u.reducer.keys)
- {
- mapred_parse_error(parser, "Duplicate KEYS for REDUCER");
- return;
- }
-}
-
-void parser_add_file(mapred_parser_t *parser, char *value)
-{
- mapred_clist_t *newitem;
- mapred_clist_t *clist;
-
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
- XASSERT(parser->current_obj->u.input.type == MAPRED_INPUT_FILE ||
- parser->current_obj->u.input.type == MAPRED_INPUT_GPFDIST);
-
- /* Verify the new file */
- if (!value || strlen(value) == 0)
- {
- switch (parser->current_obj->u.input.type)
- {
- case MAPRED_INPUT_FILE:
- mapred_parse_error(parser, "Invalid FILE");
- return;
- case MAPRED_INPUT_GPFDIST:
- mapred_parse_error(parser, "Invalid GPFDIST");
- return;
- default:
- XASSERT(false);
- }
- }
- /* Todo: improved regex checking on files */
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_clist_t));
- newitem->value = copyscalar(value);
- newitem->next = (mapred_clist_t *) NULL;
-
- /* Add the new item into the last slot of the list */
- clist = parser->current_obj->u.input.files;
- while (clist && clist->next)
- clist = clist->next;
- if (clist)
- clist->next = newitem;
- else
- parser->current_obj->u.input.files = newitem;
-}
-
-void parser_add_column(mapred_parser_t *parser, char *value)
-{
- mapred_plist_t *newitem;
- mapred_plist_t *plist;
- char *name, *type, *tokenizer;
-
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_INPUT);
-
- /*
- * Verify the new column
- * It should be in one of two forms:
- * 1) <name>
- * 2) <name> <datatype>
- */
- if (!value || strlen(value) == 0)
- {
- mapred_parse_error(parser, "Invalid COLUMNS");
- return;
- }
- name = strtok_r(value, " \t\r", &tokenizer);
- type = strtok_r(NULL, " \t\r", &tokenizer);
- if (!type)
- type = "text"; /* type defaults to 'text' */
-
- /* double check that there's nothing else */
- if (strtok_r(NULL, " \t\r", &tokenizer))
- {
- mapred_parse_error(parser, "Invalid COLUMNS");
- return;
- }
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_plist_t));
- newitem->name = copyscalar(name);
- newitem->type = copyscalar(type);
- newitem->next = (mapred_plist_t *) NULL;
-
- /* Add the new item into the last slot of the list */
- plist = parser->current_obj->u.input.columns;
- while (plist && plist->next)
- plist = plist->next;
- if (plist)
- plist->next = newitem;
- else
- parser->current_obj->u.input.columns = newitem;
-}
-
-void parser_add_parameter(mapred_parser_t *parser, char *value)
-{
- mapred_plist_t *newitem;
- mapred_plist_t *plist;
- char *name, *type, *tokenizer;
-
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
- parser->current_obj->kind == MAPRED_TRANSITION ||
- parser->current_obj->kind == MAPRED_COMBINER ||
- parser->current_obj->kind == MAPRED_FINALIZER);
-
- /*
- * Verify the new parameter
- * It should be in one of two forms:
- * 1) <name>
- * 2) <name> <datatype>
- */
- if (!value || strlen(value) == 0)
- {
- mapred_parse_error(parser, "Invalid PARAMETERS");
- return;
- }
- name = strtok_r(value, " \t\r", &tokenizer);
- type = strtok_r(NULL, " \t\r", &tokenizer);
- if (!type)
- type = "text"; /* type defaults to 'text' */
-
- /* double check that there's nothing else */
- if (strtok_r(NULL, " \t\r", &tokenizer))
- {
- mapred_parse_error(parser, "Invalid PARAMETERS");
- return;
- }
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_plist_t));
- newitem->name = copyscalar(name);
- newitem->type = copyscalar(type);
- newitem->next = (mapred_plist_t *) NULL;
-
- /* Add the new item into the last slot of the list */
- plist = parser->current_obj->u.function.parameters;
- while (plist && plist->next)
- plist = plist->next;
- if (plist)
- plist->next = newitem;
- else
- parser->current_obj->u.function.parameters = newitem;
-}
-
-void parser_add_return(mapred_parser_t *parser, char *value)
-{
- mapred_plist_t *newitem;
- mapred_plist_t *plist;
- char *name, *type, *tokenizer;
-
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
- parser->current_obj->kind == MAPRED_TRANSITION ||
- parser->current_obj->kind == MAPRED_COMBINER ||
- parser->current_obj->kind == MAPRED_FINALIZER);
-
- /*
- * Verify the new return
- * It should be in one of two forms:
- * 1) <name>
- * 2) <name> <datatype>
- */
- if (!value || strlen(value) == 0)
- {
- mapred_parse_error(parser, "Invalid RETURNS");
- return;
- }
- name = strtok_r(value, " \t\r", &tokenizer);
- type = strtok_r(NULL, " \t\r", &tokenizer);
- if (!type)
- type = "text"; /* type defaults to 'text' */
-
- /* double check that there's nothing else */
- if (strtok_r(NULL, " \t\r", &tokenizer))
- {
- mapred_parse_error(parser, "Invalid RETURNS");
- return;
- }
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_plist_t));
- newitem->name = copyscalar(name);
- newitem->type = copyscalar(type);
- newitem->next = (mapred_plist_t *) NULL;
-
- /* Add the new item into the last slot of the list */
- plist = parser->current_obj->u.function.returns;
- while (plist && plist->next)
- plist = plist->next;
- if (plist)
- plist->next = newitem;
- else
- parser->current_obj->u.function.returns = newitem;
-}
-
-
-void parser_add_key(mapred_parser_t *parser, char *value)
-{
- mapred_clist_t *newitem;
- mapred_clist_t *clist;
-
- XASSERT(parser->current_doc);
- XASSERT(parser->current_obj);
- XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
-
- /* Validate key */
- if (!value || strlen(value) == 0)
- {
- mapred_parse_error(parser, "Invalid KEYS");
- return;
- }
-
- /* Allocate the new list item */
- newitem = malloc(sizeof(mapred_clist_t));
- newitem->value = copyscalar(value);
- newitem->next = (mapred_clist_t *) NULL;
-
- /* Add the new item into the last slot of the list */
- clist = parser->current_obj->u.reducer.keys;
- while (clist && clist->next)
- clist = clist->next;
- if (clist)
- clist->next = newitem;
- else
- parser->current_obj->u.reducer.keys = newitem;
-}
-
-
-
-/*
- * mapred_dump_yaml - Given an object, dump it's YAML representation.
- * This is the inverse of parsing
- *
- * (*) Could be re-written to avoid code duplication issues.
- */
-void mapred_dump_yaml(mapred_object_t *obj)
-{
- char *ckind = NULL;
-
- if (!obj)
- return;
-
- switch (obj->kind)
- {
- case MAPRED_DOCUMENT:
- printf("---\n");
- /* Dumping the current version */
- printf("VERSION: 1.0.0.3\n");
- if (obj->u.document.database)
- printf("DATABASE: %s\n", obj->u.document.database);
- if (obj->u.document.user)
- printf("USER: %s\n", obj->u.document.user);
- if (obj->u.document.host)
- printf("HOST: %s\n", obj->u.document.host);
- if (obj->u.document.port > 0)
- printf("PORT: %d\n", obj->u.document.port);
- if (obj->u.document.flags & mapred_document_defines)
- {
- mapred_olist_t *sub;
- printf("DEFINE:\n");
- for (sub = obj->u.document.objects; sub; sub = sub->next)
- if (sub->object->kind != MAPRED_EXECUTION)
- mapred_dump_yaml(sub->object);
- }
- if (obj->u.document.flags & mapred_document_executes)
- {
- mapred_olist_t *sub;
- printf("EXECUTE:\n");
- for (sub = obj->u.document.objects; sub; sub = sub->next)
- if (sub->object->kind == MAPRED_EXECUTION)
- mapred_dump_yaml(sub->object);
- }
- break;
-
- case MAPRED_INPUT:
- printf(" - INPUT:\n");
- if (obj->name)
- printf(" NAME: %s\n", obj->name);
- if (obj->u.input.columns)
- {
- mapred_plist_t *plist;
- printf(" COLUMNS:\n");
- for (plist = obj->u.input.columns; plist; plist = plist->next)
- printf(" - %s %s\n", plist->name, plist->type);
- }
- if (obj->u.input.delimiter)
- printf(" DELIMITER: %s\n", obj->u.input.delimiter);
- if (obj->u.input.encoding)
- printf(" ENCODING: %s\n", obj->u.input.encoding);
- switch (obj->u.input.format)
- {
- case MAPRED_FORMAT_NONE:
- break;
- case MAPRED_FORMAT_TEXT:
- printf(" FORMAT: TEXT\n");
- break;
- case MAPRED_FORMAT_CSV:
- printf(" FORMAT: CSV\n");
- break;
- default:
- XASSERT(false);
- }
- switch (obj->u.input.type)
- {
- case MAPRED_INPUT_NONE:
- break;
- case MAPRED_INPUT_FILE:
- {
- mapred_clist_t *clist;
- printf(" FILE:\n");
- for (clist = obj->u.input.files; clist; clist = clist->next)
- printf(" - %s\n", clist->value);
- break;
- }
- case MAPRED_INPUT_GPFDIST:
- printf(" GPFDIST: %s\n", obj->u.input.desc);
- break;
- case MAPRED_INPUT_TABLE:
- printf(" TABLE: %s\n", obj->u.input.desc);
- break;
- case MAPRED_INPUT_QUERY:
- printf(" QUERY: |\n");
- printf(" %s\n", obj->u.input.desc);
- break;
- case MAPRED_INPUT_EXEC:
- printf(" EXEC: %s\n", obj->u.input.desc);
- break;
- default:
- XASSERT(false);
- }
- break;
-
- case MAPRED_OUTPUT:
- printf(" - OUTPUT:\n");
- if (obj->name)
- printf(" NAME: %s\n", obj->name);
- switch (obj->u.output.mode)
- {
- case MAPRED_OUTPUT_MODE_NONE:
- break;
- case MAPRED_OUTPUT_MODE_REPLACE:
- printf(" MODE: REPLACE\n");
- break;
- case MAPRED_OUTPUT_MODE_APPEND:
- printf(" MODE: APPEND\n");
- break;
- default:
- XASSERT(false);
- }
- switch (obj->u.output.type)
- {
- case MAPRED_OUTPUT_NONE:
- break;
- case MAPRED_OUTPUT_FILE:
- printf(" FILE: %s\n", obj->u.output.desc);
- break;
- case MAPRED_OUTPUT_TABLE:
- printf(" TABLE: %s\n", obj->u.output.desc);
- break;
- default:
- XASSERT(false);
- }
- break;
-
- case MAPRED_MAPPER:
- ckind = "MAP";
- /* fallthrough */
-
- case MAPRED_TRANSITION:
- if (!ckind)
- ckind = "TRANSITION";
- /* fallthrough */
-
- case MAPRED_COMBINER:
- if (!ckind)
- ckind = "CONSOLIDATE";
- /* fallthrough */
-
- case MAPRED_FINALIZER:
- if (!ckind)
- ckind = "FINALIZE";
-
- printf(" - %s:\n", ckind);
- if (obj->name)
- printf(" NAME: %s\n", obj->name);
- if (obj->u.function.parameters)
- {
- mapred_plist_t *plist;
- printf(" PARAMETERS:\n");
- for (plist = obj->u.function.parameters; plist;
- plist = plist->next)
- printf(" - %s %s\n", plist->name, plist->type);
- }
- if (obj->u.function.returns)
- {
- mapred_plist_t *plist;
- printf(" RETURNS:\n");
- for (plist = obj->u.function.returns; plist;
- plist = plist->next)
- printf(" - %s %s\n", plist->name, plist->type);
- }
- switch (obj->u.function.mode)
- {
- case MAPRED_MODE_NONE:
- break;
- case MAPRED_MODE_SINGLE:
- printf(" MODE: SINGLE\n");
- break;
- case MAPRED_MODE_MULTI:
- printf(" MODE: MULTI\n");
- break;
- case MAPRED_MODE_ACCUMULATED:
- printf(" MODE: ACCUMULATED\n");
- break;
- case MAPRED_MODE_WINDOWED:
- printf(" MODE: WINDOWED\n");
- break;
- default:
- printf(" MODE: UNKNOWN\n");
- break;
- }
- if (obj->u.function.flags)
- {
- printf(" OPTIMIZE: ");
- if (obj->u.function.flags & mapred_function_strict)
- printf("STRICT ");
- if (obj->u.function.flags & mapred_function_immutable)
- printf("IMMUTABLE ");
- if (obj->u.function.flags & mapred_function_unordered)
- printf("UNORDERED ");
- printf("\n");
- }
- if (obj->u.function.language)
- printf(" LANGUAGE: %s\n", obj->u.function.language);
- if (obj->u.function.body)
- {
- printf(" FUNCTION: |\n");
- printf(" %s\n", obj->u.function.body);
- }
- break;
-
- case MAPRED_REDUCER:
- printf(" - REDUCE:\n");
- if (obj->name)
- printf(" NAME: %s\n", obj->name);
- if (obj->u.reducer.transition.name)
- printf(" TRANSITION: %s\n",
- obj->u.reducer.transition.name);
- if (obj->u.reducer.combiner.name)
- printf(" CONSOLIDATE: %s\n",
- obj->u.reducer.combiner.name);
- if (obj->u.reducer.finalizer.name)
- printf(" FINALIZE: %s\n",
- obj->u.reducer.finalizer.name);
- if (obj->u.reducer.initialize)
- printf(" INITIALIZE: %s\n",
- obj->u.reducer.initialize);
- if (obj->u.reducer.keys)
- {
- mapred_clist_t *clist;
- printf(" KEYS: |\n");
- for (clist = obj->u.reducer.keys; clist; clist = clist->next)
- printf(" - %s\n", clist->value);
- }
- if (obj->u.reducer.ordering)
- {
- mapred_clist_t *clist;
- printf(" ORDERING: |\n");
- for (clist = obj->u.reducer.ordering; clist; clist = clist->next)
- printf(" - %s\n", clist->value);
- }
- break;
-
- case MAPRED_TASK:
- case MAPRED_EXECUTION:
- if (obj->u.task.execute)
- printf(" - RUN:\n");
- else
- printf(" - TASK:\n");
- if (obj->name)
- printf(" NAME: %s\n", obj->name);
- if (obj->u.task.input.name)
- printf(" SOURCE: %s\n", obj->u.task.input.name);
- if (obj->u.task.mapper.name)
- printf(" MAP: %s\n", obj->u.task.mapper.name);
- if (obj->u.task.reducer.name)
- printf(" REDUCE: %s\n", obj->u.task.reducer.name);
- if (obj->u.task.output.name)
- printf(" TARGET: %s\n", obj->u.task.output.name);
- break;
-
- case MAPRED_NO_KIND:
- default:
- XRAISE(MAPRED_PARSE_INTERNAL,
- "Unknown object type");
- }
-}
-
-
-
-int mapred_verify_object(mapred_parser_t *parser, mapred_object_t *obj)
-{
- char *name;
- int error = NO_ERROR;
-
- XASSERT(obj);
-
- /* Verify that all required fields are present and valid */
- name = obj->name ? obj->name : "unnamed";
- switch (obj->kind)
- {
- case MAPRED_DOCUMENT:
-
- /*
- * If there is a version on the document then it should have
- * been validated by parser_set_version()
- */
- if (!obj->u.document.version)
- {
- error = mapred_obj_error(obj, "Missing VERSION",
- parser->doc_number);
- }
-
- break;
-
- case MAPRED_INPUT:
-
- /* Validate required fields */
- if (!obj->name)
- error = mapred_obj_error(obj, "Missing NAME");
- if (obj->u.input.type == MAPRED_INPUT_NONE)
- error = mapred_obj_error(obj,
- "Missing FILE, GPFDIST, TABLE, QUERY, or EXEC");
-
- /* set default values */
- if (error == NO_ERROR)
- {
- if (!obj->u.input.columns)
- {
- obj->u.input.columns = malloc(sizeof(mapred_plist_t));
- obj->u.input.columns->name = copyscalar("value");
- obj->u.input.columns->type = copyscalar("text");
- obj->u.input.columns->next = NULL;
- }
- if (!obj->u.input.columns->next &&
- !obj->u.input.delimiter)
- {
- obj->u.input.delimiter = copyscalar("off");
- }
- }
- break;
-
- case MAPRED_OUTPUT:
-
- if (!obj->name)
- error = mapred_obj_error(obj, "Missing NAME");
- if (obj->u.output.type == MAPRED_OUTPUT_NONE)
- error = mapred_obj_error(obj, "Missing FILE or TABLE");
- break;
-
- case MAPRED_MAPPER:
- case MAPRED_TRANSITION:
- case MAPRED_COMBINER:
- case MAPRED_FINALIZER:
-
- if (!obj->name)
- error = mapred_obj_error(obj, "Missing NAME");
-
- /*
- * We now support "builtin" functions, which are specified by a lack
- * of an implementation language. If a language is specified then
- * a function body is still required. If a language is not specified
- * then the function body just defaults to the name of the function.
- */
- if (obj->name && !obj->u.function.language && !obj->u.function.body)
- obj->u.function.body = copyscalar(obj->name);
-
- if (obj->u.function.language && !obj->u.function.body)
- error = mapred_obj_error(obj, "Missing FUNCTION");
-
- /*
- * LIBRARY is required for "C" language functions.
- * LIBRARY is invalid for any other language.
- *
- * It would be good to verify that LIBRARY is not used in
- * older YAML formats, but that is difficult given the current
- * structure of the code.
- */
- if (obj->u.function.language)
- {
- if (obj->u.function.library)
- {
- if (strcasecmp("C", obj->u.function.language))
- {
- error = mapred_obj_error(obj, "LIBRARY is invalid for "
- "%s LANGUAGE functions",
- obj->u.function.language);
- }
- }
- else if (!strcasecmp("C", obj->u.function.language))
- {
- error = mapred_obj_error(obj, "Missing LIBRARY");
- }
-
- /*
- * Don't bother filling in default arguments if we already have
- * an error.
- */
- if (error)
- break;
-
- /*
- * Set default values.
- * For builtin functions we delay this so that we can lookup the
- * function in the catalog to determine the defaults.
- */
- if (!obj->u.function.parameters)
- {
- const char *name = default_parameter_names[obj->kind][0];
- name = default_parameter_names[obj->kind][0];
- obj->u.function.parameters = malloc(sizeof(mapred_plist_t));
- obj->u.function.parameters->type = copyscalar("text");
- obj->u.function.parameters->name = copyscalar(name);
- obj->u.function.parameters->next = NULL;
-
- name = default_parameter_names[obj->kind][1];
- if (name)
- {
- obj->u.function.parameters->next = malloc(sizeof(mapred_plist_t));
- obj->u.function.parameters->next->type = copyscalar("text");
- obj->u.function.parameters->next->name = copyscalar(name);
- obj->u.function.parameters->next->next = NULL;
- }
- }
- else
- {
- switch (obj->kind)
- {
- case MAPRED_TRANSITION:
- if (!obj->u.function.parameters->next)
- {
- error = mapred_obj_error(
- obj,
- "requires at least 2 input parameters [state, arg1, ...]"
- );
- }
- break;
-
- case MAPRED_COMBINER:
- if (!obj->u.function.parameters->next ||
- obj->u.function.parameters->next->next)
- {
- error = mapred_obj_error(
- obj,
- "requires exactly 2 input parameters [state1, state2]"
- );
- }
- break;
-
- case MAPRED_FINALIZER:
- if (obj->u.function.parameters->next)
- {
- error = mapred_obj_error(
- obj,
- "requires exactly 1 input parameter [state]"
- );
- }
- break;
-
- case MAPRED_MAPPER:
- default:
- break;
- }
- }
-
- if (!obj->u.function.returns)
- {
- const char *name = default_return_names[obj->kind][0];
- obj->u.function.returns = malloc(sizeof(mapred_plist_t));
- obj->u.function.returns->type = copyscalar("text");
- obj->u.function.returns->name = copyscalar(name);
- obj->u.function.returns->next = NULL;
-
- name = default_return_names[obj->kind][1];
- if (name)
- {
- obj->u.function.returns->next = malloc(sizeof(mapred_plist_t));
- obj->u.function.returns->next->type = copyscalar("text");
- obj->u.function.returns->next->name = copyscalar(name);
- obj->u.function.returns->next->next = NULL;
- }
- }
- else if (obj->kind == MAPRED_TRANSITION ||
- obj->kind == MAPRED_COMBINER)
- {
- if (obj->u.function.returns->next)
- {
- error = mapred_obj_error(
- obj,
- "requires exactly one output parameter [state]"
- );
- }
- }
-
- /* Set default mode: depends on type of function */
- if (obj->u.function.mode == MAPRED_MODE_NONE)
- {
- if (obj->kind == MAPRED_TRANSITION ||
- obj->kind == MAPRED_COMBINER)
- {
- obj->u.function.mode = MAPRED_MODE_SINGLE;
- }
- else
- {
- obj->u.function.mode = MAPRED_MODE_MULTI;
- }
- }
- }
- break;
-
- case MAPRED_REDUCER:
-
- if (!obj->name)
- error = mapred_obj_error(obj, "Missing NAME");
- if (!obj->u.reducer.transition.name)
- error = mapred_obj_error(obj, "Missing TRANSITION");
- /*
- * Will verify that functions are valid for reducer input after we
- * have resolved the pointers.
- */
-
- /*
- * It would be good to verify that ORDERING is not used in
- * older YAML formats, but that is difficult given the current
- * structure of the code.
- */
-
- /*
- * ORDERING and COMBINER are incompatible
- */
- if (obj->u.reducer.ordering != NULL &&
- obj->u.reducer.combiner.name)
- {
- error = mapred_obj_error(obj,
- "REDUCERS cannot specify both a COMBINER "
- "function and an ORDERING specification");
- }
-
- /* Setup default "keys" */
- if (!obj->u.reducer.keys)
- {
- obj->u.reducer.keys = malloc(sizeof(mapred_clist_t));
- obj->u.reducer.keys->value = copyscalar("key");
- obj->u.reducer.keys->next = malloc(sizeof(mapred_clist_t));
- obj->u.reducer.keys->next->next = NULL;
- obj->u.reducer.keys->next->value = copyscalar("*");
- }
-
- break;
-
- case MAPRED_TASK:
- if (!obj->name)
- error = mapred_obj_error(obj, "Missing NAME");
-
- /* Fallthrough */
-
- case MAPRED_EXECUTION:
-
- if (!obj->u.task.input.name)
- error = mapred_obj_error(obj, "Missing SOURCE");
-
- /* IDENTITY Mappers and Reducers */
- if (obj->u.task.mapper.name &&
- !strcasecmp("IDENTITY", obj->u.task.mapper.name))
- {
- free(obj->u.task.mapper.name);
- obj->u.task.mapper.name = NULL;
- }
- if (obj->u.task.reducer.name &&
- !strcasecmp("IDENTITY", obj->u.task.reducer.name))
- {
- free(obj->u.task.reducer.name);
- obj->u.task.reducer.name = NULL;
- }
-
- /* STDOUT Output */
- if (obj->u.task.output.name &&
- !strcasecmp("STDOUT", obj->u.task.output.name))
- {
- free(obj->u.task.output.name);
- obj->u.task.output.name = NULL;
- }
- break;
-
- case MAPRED_NO_KIND:
- default:
- XASSERT(false);
- }
-
- return error;
-}
-
-
-